直入正题,本文意在整理出Pod创建过程中和Volume挂载有关的源码及其调用过程。这部分的源码解读我并没有找到什么参考资料,所以会不全,也可能有错。
syncPod
直接从syncPod开始看,如上一篇博客(Pod创建流程)中所写,该方法会在创建Pod前进行一些准备工作,比如创建基础目录什么的。方法的最后,它会调用WaitForAttachAndMount等待Pod请求的Volume进行attach和mount,之后,调用SyncPod真正开始创建Pod,源码如下:
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// ...
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
}
// ...
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime so we get better errors
return err
}
return nil
}
因为要最终Volume挂咋过程,所以进入WaitForAttachAndMount中看看,该方法位于pkg/kubelet/volumemanager/volume_manager.go中。
WaitForAttachAndMount
源码如下:
func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
// 得到Pod预期要挂载的volume列表
expectedVolumes := getExpectedVolumes(pod)
if len(expectedVolumes) == 0 {
// No volumes to verify
return nil
}
glog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
// 得到Pod的唯一标识
uniquePodName := volumehelper.GetUniquePodName(pod)
// Some pods expect to have Setup called over and over again to update.
// Remount plugins for which this is true. (Atomically updating volumes,
// like Downward API, depend on this to update the contents of the volume).
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
vm.actualStateOfWorld.MarkRemountRequired(uniquePodName)
// 阻塞并循环检测,直到Pod预期的卷全部挂载
err := wait.Poll(
podAttachAndMountRetryInterval,
podAttachAndMountTimeout,
vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
if err != nil {
// Timeout expired
unmountedVolumes :=
vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
if len(unmountedVolumes) == 0 {
return nil
}
return fmt.Errorf(
"timeout expired waiting for volumes to attach/mount for pod %q/%q. list of unattached/unmounted volumes=%v",
pod.Namespace,
pod.Name,
unmountedVolumes)
}
glog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
return nil
}
这个方法从大体上来看目的很明确,就是一直等到Pod请求挂载的Volume全部挂载完毕,整个等待过程分为四步:
- 调用
getExpectedVolumes来获得Pod预期挂载的所有卷的卷名,返回一个string切片。 - 调用
desiredStateOfWorldPopulator.ReprocessPod,将该Pod从ProcessedPod列表中删除,迫使它被重新处理。 - 调用
actualStateOfWorld.MarkRemountRequired,对已经挂载到该Pod上且需要重新挂载的Volume进行标记。 - 使用
wait.Poll阻塞进程并持续检测,没测检测都调用verifyVolumesMountedFunc来查看是否该Pod预期挂载的所有Volume都已挂载完毕。
首先是getExpectedVolumes,这个方法体很简单,就是通过遍历pod.Spec.Volumes字段获取全部预期挂载的Volume名而已,这里不细说了。
至于ReprocessPod和MarkRemountRequired,这两个方法我并不是很明白,这个涉及到dsw和asw了,我只能试着理解个大概。
首先看ReprocessPod,这个方法位于pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中,源码如下:
func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
podName volumetypes.UniquePodName) {
dswp.deleteProcessedPod(podName)
}
// markPodProcessed removes the specified pod from processedPods
func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
podName volumetypes.UniquePodName) {
dswp.pods.Lock()
defer dswp.pods.Unlock()
delete(dswp.pods.processedPods, podName)
}
可以看到,这个代码实现其实很简单,仅仅是将该Pod从processedPodsMap中删除。且这个Map结构也很简单,是个<string, bool>型,我不知道把Pod从中删除后会发生什么,还有待学习。
然后就是MarkRemountRequired,这个方法就比较复杂了,源码如下:
func (asw *actualStateOfWorld) MarkRemountRequired(
podName volumetypes.UniquePodName) {
asw.Lock()
defer asw.Unlock()
// attachedVolumes is a map containing the set of volumes the kubelet volume manager believes to be successfully attached to this node
for volumeName, volumeObj := range asw.attachedVolumes {
for mountedPodName, podObj := range volumeObj.mountedPods {
if mountedPodName != podName {
continue
}
volumePlugin, err :=
asw.volumePluginMgr.FindPluginBySpec(volumeObj.spec)
if err != nil || volumePlugin == nil {
// Log and continue processing
glog.Errorf(
"MarkRemountRequired failed to FindPluginBySpec for pod %q (podUid %q) volume: %q (volSpecName: %q)",
podObj.podName,
podObj.podUID,
volumeObj.volumeName,
volumeObj.spec.Name())
continue
}
if volumePlugin.RequiresRemount() {
podObj.remountRequired = true
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
}
}
}
}
虽然看上去比较复杂,但逻辑很简单。首先,从asw.attachedVolumes中获取到已经attach到这个node的Volume列表,然后遍历整个列表。对于列表中的所有Volume,遍历挂载这Volume的所有Pod,如果是当前Pod,则继续操作,反之下一个。
遍历到该Pod挂载的Volume后,通过FindPluginBySpec找到可以支持整个Volume的插件。然后通过插件来判断这个Volume是否需要重挂载,如果需要的话,就加一个标记,把对应字段设为true,仅此而已。我也不知道设为true之后会发生什么。
接着,开始持续调用verifyVolumesMountedFunc来判断是否所有预测Volume都已挂载,来看看它的判断流程:
// verifyVolumesMountedFunc returns a method that returns true when all expected
// volumes are mounted.
func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
return func() (done bool, err error) {
return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
}
}
// getUnmountedVolumes fetches the current list of mounted volumes from
// the actual state of the world, and uses it to process the list of
// expectedVolumes. It returns a list of unmounted volumes.
func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
mountedVolumes := sets.NewString()
for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
}
return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
}
这个方法会调用getUnmountedVolumes来得到该Pod预期挂载但还未挂载的Volume数,如果为0才返回true。先看看GetMountedVolumesForPod,源码如下:
func (asw *actualStateOfWorld) GetMountedVolumesForPod(
podName volumetypes.UniquePodName) []MountedVolume {
asw.RLock()
defer asw.RUnlock()
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for mountedPodName, podObj := range volumeObj.mountedPods {
if mountedPodName == podName {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
return mountedVolume
}
这个代码比较明晰。首先,它加的是一个读锁,说明并不会改变asw。后面的两层for循环逻辑和之前那个一模一样,先通过asw.attachedVolumes得到已经attach到node的所有Volume,在通过podName匹配挂载到该Pod的Volume。只不过,它需要调用getMountedVolume组装成一个新的结构MountedVolume,然后包成切片返回,没什么好说的。
在来看看filterUnmountedVolumes是如何过滤的,其源码如下:
// filterUnmountedVolumes adds each element of expectedVolumes that is not in
// mountedVolumes to a list of unmountedVolumes and returns it.
func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
unmountedVolumes := []string{}
for _, expectedVolume := range expectedVolumes {
if !mountedVolumes.Has(expectedVolume) {
unmountedVolumes = append(unmountedVolumes, expectedVolume)
}
}
return unmountedVolumes
}
可以看到,逻辑也很简单,还是遍历。遍历expectedVolumes,如果mountedVolumes中不存在这个Volume,就说明该挂载但是还未挂载。
至此,WaitForAttachAndMount也就结束了。在上面的代码中,都只是进行一些检测与判断,并没有进行实际的挂载。也就是说,真正的Volume挂载到Pod的操作,并不在这里完成,换句话说,到这里时宿主机的Volume已经完成了挂载。而实际的挂载过程,应该在volumnManager中进行的,但是这不部分我还没读懂,欠努力。
Volume和Pod如何挂载的我还没弄清,还要学习。接下来,只着眼于Pod中的Container是如何挂载到上述Volume的。
SyncPod
回到syncPod,执行完上述的等待后,调用getPullSecretsForPod将Pod的secrets拉下来,但在这里不重要。重要的是最后要调用的SyncPod,看看它的源码,位于pkg/kubelet/kuberuntime/kuberuntime_manager.go中:
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
// 检擦Pod Spec是否发生改变,如果是,将这些改变记录下来
podContainerChanges := m.computePodActions(pod, podStatus)
glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
}
}
// Step 2: Kill the pod if the sandbox has changed.
// kill掉 sandbox 已经改变的 pod
if podContainerChanges.KillPod {
if !podContainerChanges.CreateSandbox {
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
} else {
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
}
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
// kill掉ContainersToKill列表中的container
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
return
}
}
}
// Keep terminated init containers fairly aggressively controlled
// This is an optmization because container removals are typically handled
// by container garbage collector.
// 删除一些Init Container,减少容器垃圾收集器上的负载
m.pruneInitContainersBeforeStart(pod, podStatus)
// We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other
// functions, in order to facilitate functionality that requires this
// value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet.
//
// We default to the IP in the passed-in pod status, and overwrite it if the
// sandbox needs to be (re)started.
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}
// Step 4: Create a sandbox for the pod if necessary.
// 为pod创建sandbox,如果需要的话
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error
glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox.")
return
}
glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
result.Fail(err)
return
}
// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}
// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
//生成Sandbox的config配置,如pod的DNS、hostName、端口映射
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
glog.Error(message)
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
return
}
// Step 5: start the init container.
// 启动初始化容器
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
return
}
glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
// 启动
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
startContainerResult.Fail(err, msg)
utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
return
}
// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
// 启动主容器
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
// 启动
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
glog.V(3).Infof("container start failed: %v: %s", err, msg)
default:
utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
}
continue
}
}
return
}
这个方法之前也说过,首先会调用computePodActions计算一下有哪些Pod Spec中的变化,实际上就是一些是否需要执行动作。将这些变换记录在podContainerChanges中,是一个podActions结构:
// podActions keeps information what to do for a pod.
type podActions struct {
KillPod bool
CreateSandbox bool
SandboxID string
Attempt uint32
NextInitContainerToStart *v1.Container
ContainersToStart []int
ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
}
调用完后就可以看见,下面再疯狂的if-else,因为要根据podActions来选取对应的动作。因为我们要创建Pod,所以只专注于ContainersToStart部分:
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
glog.V(3).Infof("container start failed: %v: %s", err, msg)
default:
utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
}
continue
}
}
可以看到,其中最最核心的方法就是startContainer,这个方法会首先对容器生成一些配置,然后在启动该容器,我们要研究的Volume挂载过程就在里面。
startContainer
进入这个方法看一下,它位于pkg/kubelet/kuberuntime/kuberuntime_container.go中:
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return msg, err
}
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
}
glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
containerConfig, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
if err != nil {
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainerConfig
}
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainer
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", err)
return "Internal PreStartContainer hook failed", err
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")
if ref != nil {
m.containerRefManager.SetRef(kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}, ref)
}
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
if err != nil {
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
legacySymlink, containerID, containerLog, err)
}
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
}
return msg, ErrPostStartHook
}
}
return "", nil
}
这个方法大致分为三个部分:
- 拉取容器镜像;
- 创建容器;
- 启动这个容器。
其中,最核心的就是第二步创建容器。这里只看核心步骤它会先调用generateContainerConfig生成容器的配置,然后将该配置传给CreateContainer来创建容器。

来一个个看。
generateContainerConfig
方法位于pkg/kubelet/kuberuntime/kuberuntime_container.go中,源码如下:
// generateContainerConfig generates container config for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) generateContainerConfig(container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string) (*runtimeapi.ContainerConfig, error) {
opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, container, podIP)
if err != nil {
return nil, err
}
uid, username, err := m.getImageUser(container.Image)
if err != nil {
return nil, err
}
// Verify RunAsNonRoot. Non-root verification only supports numeric user.
if err := verifyRunAsNonRoot(pod, container, uid, username); err != nil {
return nil, err
}
command, args := kubecontainer.ExpandContainerCommandAndArgs(container, opts.Envs)
containerLogsPath := buildContainerLogsPath(container.Name, restartCount)
restartCountUint32 := uint32(restartCount)
config := &runtimeapi.ContainerConfig{
Metadata: &runtimeapi.ContainerMetadata{
Name: container.Name,
Attempt: restartCountUint32,
},
Image: &runtimeapi.ImageSpec{Image: imageRef},
Command: command,
Args: args,
WorkingDir: container.WorkingDir,
Labels: newContainerLabels(container, pod),
Annotations: newContainerAnnotations(container, pod, restartCount),
Devices: makeDevices(opts),
Mounts: m.makeMounts(opts, container),
LogPath: containerLogsPath,
Stdin: container.Stdin,
StdinOnce: container.StdinOnce,
Tty: container.TTY,
Linux: m.generateLinuxContainerConfig(container, pod, uid, username),
}
// set environment variables
envs := make([]*runtimeapi.KeyValue, len(opts.Envs))
for idx := range opts.Envs {
e := opts.Envs[idx]
envs[idx] = &runtimeapi.KeyValue{
Key: e.Name,
Value: e.Value,
}
}
config.Envs = envs
return config, nil
}
方法会先调用GenerateRunContainerOptions生成一个opts,是一个RunContainerOptions结构,该结构字段如下:
type RunContainerOptions struct {
Envs []EnvVar
Mounts []Mount
Devices []DeviceInfo
PortMappings []PortMapping
PodContainerDir string
CgroupParent string
ReadOnly bool
Hostname string
EnableHostUserNamespace bool
}
其中就有我们要的更挂载相关的字段Mounts。生成完opts之后,会调用getImageUser得到用户。接着,调用ExpandContainerCommandAndArgs来得到容器命令和参数,调用buildContainerLogsPath生成日志文件路径。最后,通过这些参数创建一个ContainerConfig类型的结构变量,就是容器配置。
我们要关注该结构中的Mounts字段,该字段决定了容器创建时的Volume挂载。通过m.makeMounts(opts, container)生成挂载映射表,赋值给它。
先来看看opts是怎么生成了,然后在回头看这个m.makeMounts。
GenerateRunContainerOptions
方法位于pkg/kubelet/kubelet_pods.go中,源码如下:
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
// 创建一个空的opts(RunContainerOptions)
opts, err := kl.containerManager.GetResources(pod, container)
if err != nil {
return nil, err
}
// 得到Pod的cgroup parent
cgroupParent := kl.GetPodCgroupParent(pod)
opts.CgroupParent = cgroupParent
// 为Pod创建hostname和hostDomainName
hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
if err != nil {
return nil, err
}
opts.Hostname = hostname
// podName为唯一标识符
podName := volumehelper.GetUniquePodName(pod)
// 获取pod挂载的Volume
volumes := kl.volumeManager.GetMountedVolumesForPod(podName)
// 构建容器的端口映射表
opts.PortMappings = kubecontainer.MakePortMappings(container)
// TODO(random-liu): Move following convert functions into pkg/kubelet/container
devices, err := kl.makeGPUDevices(pod, container)
if err != nil {
return nil, err
}
opts.Devices = append(opts.Devices, devices...)
// TODO: remove feature gate check after no longer needed
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
blkutil := volumeutil.NewBlockVolumePathHandler()
blkVolumes, err := kl.makeBlockVolumes(pod, container, volumes, blkutil)
if err != nil {
return nil, err
}
opts.Devices = append(opts.Devices, blkVolumes...)
}
// 生成挂载映射表1
mounts, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
if err != nil {
return nil, err
}
opts.Mounts = append(opts.Mounts, mounts...)
// 生成环境变量表
envs, err := kl.makeEnvironmentVariables(pod, container, podIP)
if err != nil {
return nil, err
}
opts.Envs = append(opts.Envs, envs...)
// Disabling adding TerminationMessagePath on Windows as these files would be mounted as docker volume and
// Docker for Windows has a bug where only directories can be mounted
if len(container.TerminationMessagePath) != 0 && runtime.GOOS != "windows" {
// 创建容器路径
p := kl.getPodContainerDir(pod.UID, container.Name)
if err := os.MkdirAll(p, 0750); err != nil {
glog.Errorf("Error on creating %q: %v", p, err)
} else {
opts.PodContainerDir = p
}
}
// only do this check if the experimental behavior is enabled, otherwise allow it to default to false
// 如果有需求, 允许Pod内的容器共享宿主机的namespace
if kl.experimentalHostUserNamespaceDefaulting {
opts.EnableHostUserNamespace = kl.enableHostUserNamespace(pod)
}
return opts, nil
}
可以看到,该方法会逐步生成需要指定的各种Options字段,流程如下:
- 调用
GetResources创建一个空的opts(RunContainerOptions结构变量),进入这个方法会发现它仅仅返回一个空的结构指针; - 调用
GetPodCgroupParent来获取Pod的cgroup parent; - 调用
GeneratePodHostNameAndDomain来为Pod创建hostname,hostDomainName; - 调用
GetUniquePodName得到该Pod的唯一标识符; - 调用
GetMountedVolumesForPod来获取Pod挂载的Volume。注意,这个方法是位于volumeManager中的,但是它内部是通过调用之前那个asw.GetMountedVolumesForPod来获取相关信息的,这里不再展开; - 调用
MakePortMappings来构建容器的端口映射表; - 进行一些设备相关的配置;
- 调用kubelet包中的
makeMounts来初步生成容器的的挂载映射表; - 调用
makeEnvironmentVariables来生成环境变量表; - 创建容器在Pod所处目录下的路径,这一步就是简单的路径拼接和目录创建;
- 如果需要的话,允许Pod内的容器共享宿主机的namespace;
其中,跟容器内卷挂载配置相关的操作就是第8步,这里深入makeMounts看看挂载表是如何生成的。
makeMounts(kublet包)
该方法位于pkg/kubelet/kubelet_pods.go中,源码如下:
// 决定容器的挂载点
// makeMounts determines the mount points for the given container.
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
// Kubernetes only mounts on /etc/hosts if:
// - container is not an infrastructure (pause) container
// - container is not already mounting on /etc/hosts
// - OS is not Windows
// Kubernetes will not mount /etc/hosts if:
// - when the Pod sandbox is being created, its IP is still unknown. Hence, PodIP will not have been set.
mountEtcHostsFile := len(podIP) > 0 && runtime.GOOS != "windows"
glog.V(3).Infof("container: %v/%v/%v podIP: %q creating hosts mount: %v", pod.Namespace, pod.Name, container.Name, podIP, mountEtcHostsFile)
// 创建一个空的挂载映射表
mounts := []kubecontainer.Mount{}
for _, mount := range container.VolumeMounts {
// do not mount /etc/hosts if container is already mounting on the path
mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
vol, ok := podVolumes[mount.Name]
if !ok || vol.Mounter == nil {
glog.Errorf("Mount cannot be satisfied for container %q, because the volume is missing or the volume mounter is nil: %+v", container.Name, mount)
return nil, fmt.Errorf("cannot find volume %q to mount into container %q", mount.Name, container.Name)
}
relabelVolume := false
// If the volume supports SELinux and it has not been
// relabeled already and it is not a read-only volume,
// relabel it and mark it as labeled
if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux && !vol.SELinuxLabeled {
vol.SELinuxLabeled = true
relabelVolume = true
}
hostPath, err := volume.GetPath(vol.Mounter)
if err != nil {
return nil, err
}
if mount.SubPath != "" {
if filepath.IsAbs(mount.SubPath) {
return nil, fmt.Errorf("error SubPath `%s` must not be an absolute path", mount.SubPath)
}
err = volumevalidation.ValidatePathNoBacksteps(mount.SubPath)
if err != nil {
return nil, fmt.Errorf("unable to provision SubPath `%s`: %v", mount.SubPath, err)
}
fileinfo, err := os.Lstat(hostPath)
if err != nil {
return nil, err
}
perm := fileinfo.Mode()
hostPath = filepath.Join(hostPath, mount.SubPath)
if subPathExists, err := utilfile.FileOrSymlinkExists(hostPath); err != nil {
glog.Errorf("Could not determine if subPath %s exists; will not attempt to change its permissions", hostPath)
} else if !subPathExists {
// Create the sub path now because if it's auto-created later when referenced, it may have an
// incorrect ownership and mode. For example, the sub path directory must have at least g+rwx
// when the pod specifies an fsGroup, and if the directory is not created here, Docker will
// later auto-create it with the incorrect mode 0750
if err := os.MkdirAll(hostPath, perm); err != nil {
glog.Errorf("failed to mkdir:%s", hostPath)
return nil, err
}
// chmod the sub path because umask may have prevented us from making the sub path with the same
// permissions as the mounter path
if err := os.Chmod(hostPath, perm); err != nil {
return nil, err
}
}
}
// Docker Volume Mounts fail on Windows if it is not of the form C:/
containerPath := mount.MountPath
if runtime.GOOS == "windows" {
if (strings.HasPrefix(hostPath, "/") || strings.HasPrefix(hostPath, "\\")) && !strings.Contains(hostPath, ":") {
hostPath = "c:" + hostPath
}
}
if !filepath.IsAbs(containerPath) {
containerPath = makeAbsolutePath(runtime.GOOS, containerPath)
}
propagation, err := translateMountPropagation(mount.MountPropagation)
if err != nil {
return nil, err
}
glog.V(5).Infof("Pod %q container %q mount %q has propagation %q", format.Pod(pod), container.Name, mount.Name, propagation)
mounts = append(mounts, kubecontainer.Mount{
Name: mount.Name,
ContainerPath: containerPath,
HostPath: hostPath,
ReadOnly: mount.ReadOnly,
SELinuxRelabel: relabelVolume,
Propagation: propagation,
})
}
if mountEtcHostsFile {
hostAliases := pod.Spec.HostAliases
hostsMount, err := makeHostsMount(podDir, podIP, hostName, hostDomain, hostAliases, pod.Spec.HostNetwork)
if err != nil {
return nil, err
}
mounts = append(mounts, *hostsMount)
}
return mounts, nil
}
在分析该源码之前,先看下VolumMount这个结构体,即Pod.Spec.Container.VolumeMount,是Pod中容器的卷挂载请求信息,如下:
// VolumeMount describes a mounting of a Volume within a container.
type VolumeMount struct {
// This must match the Name of a Volume.
Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
// Mounted read-only if true, read-write otherwise (false or unspecified).
// Defaults to false.
// +optional
ReadOnly bool `json:"readOnly,omitempty" protobuf:"varint,2,opt,name=readOnly"`
// Path within the container at which the volume should be mounted. Must
// not contain ':'.
MountPath string `json:"mountPath" protobuf:"bytes,3,opt,name=mountPath"`
// Path within the volume from which the container's volume should be mounted.
// Defaults to "" (volume's root).
// +optional
SubPath string `json:"subPath,omitempty" protobuf:"bytes,4,opt,name=subPath"`
// mountPropagation determines how mounts are propagated from the host
// to container and the other way around.
// When not set, MountPropagationHostToContainer is used.
// This field is alpha in 1.8 and can be reworked or removed in a future
// release.
// +optional
MountPropagation *MountPropagationMode `json:"mountPropagation,omitempty" protobuf:"bytes,5,opt,name=mountPropagation,casttype=MountPropagationMode"`
}
其中,Name字段为容器要挂载的Volume的卷名,ReadOnly字段指是否只读。MountPath是容器内的挂载点,就像注释上说的,within the container。SubPath是相对于Volume的根路径,是宿主机上的一个子路径,在默认情况下为空,代表容器直接挂载到Volume的根目录下。
回到方法,可以看出其主要流程如下:
- 创建一个空的挂载映射表
mounts; - 对容器请求的每一个挂载映射(即上述VolumeMount),执行下述操作:
- 从
podVolumes中得到要挂载的Volume的具体信息; - 调用
GetPath得到该Volume的根路径,即hostPath; - 对
hostPathg和subPath进行一些必要的检验,然后拼接起来成为新的挂载路径。具体的检验操作和拼接操作以及相关的漏洞参见之前的一篇博客:CVE-2017; - 获得外部挂载路径后,开始获取容器内要挂载的路径。通过一些简单的路径操作,得到容器内待挂载的绝对路径
containerPath; - 根据
VolumeName、拼接了subPath的hostPath、容器内路径containerPath、是否只读选项readOnly以及另外两个没有提到的属性,构建一个完整的挂载映射关系,并把它追加到mounts中。
处理完所有VolumeMount之后,该容器的挂载映射表就生成完毕,方法正常返回。通过上面的分析看出,该方法核心就是生成内外路径的映射关系和一些属性,将其作为配置信息传出去,并没有真正深入OS内核执行挂载。
makeMount(kuberuntime包)
生成完opts后,即可更具其构建最终的配置config,即:
config := &runtimeapi.ContainerConfig{
Metadata: &runtimeapi.ContainerMetadata{
Name: container.Name,
Attempt: restartCountUint32,
},
Image: &runtimeapi.ImageSpec{Image: imageRef},
Command: command,
Args: args,
WorkingDir: container.WorkingDir,
Labels: newContainerLabels(container, pod),
Annotations: newContainerAnnotations(container, pod, restartCount),
Devices: makeDevices(opts),
Mounts: m.makeMounts(opts, container),
LogPath: containerLogsPath,
Stdin: container.Stdin,
StdinOnce: container.StdinOnce,
Tty: container.TTY,
Linux: m.generateLinuxContainerConfig(container, pod, uid, username),
}
可以看到,config.Mounts并不是单单引用了上面讲的opts.Mounts,而是又做了一遍操作。也就是说,之前的那个挂载映射表只是个初步的,这里还要改一下。
看一下这个makeMounts,位于pkg/kubelet/kuberuntime/kuberuntime_container.go中,源码如下:
// makeMounts generates container volume mounts for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerOptions, container *v1.Container) []*runtimeapi.Mount {
volumeMounts := []*runtimeapi.Mount{}
for idx := range opts.Mounts {
v := opts.Mounts[idx]
selinuxRelabel := v.SELinuxRelabel && selinux.SELinuxEnabled()
mount := &runtimeapi.Mount{
HostPath: v.HostPath,
ContainerPath: v.ContainerPath,
Readonly: v.ReadOnly,
SelinuxRelabel: selinuxRelabel,
Propagation: v.Propagation,
}
volumeMounts = append(volumeMounts, mount)
}
// The reason we create and mount the log file in here (not in kubelet) is because
// the file's location depends on the ID of the container, and we need to create and
// mount the file before actually starting the container.
if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 {
// Because the PodContainerDir contains pod uid and container name which is unique enough,
// here we just add a random id to make the path unique for different instances
// of the same container.
cid := makeUID()
containerLogPath := filepath.Join(opts.PodContainerDir, cid)
fs, err := m.osInterface.Create(containerLogPath)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error on creating termination-log file %q: %v", containerLogPath, err))
} else {
fs.Close()
// Chmod is needed because ioutil.WriteFile() ends up calling
// open(2) to create the file, so the final mode used is "mode &
// ~umask". But we want to make sure the specified mode is used
// in the file no matter what the umask is.
if err := m.osInterface.Chmod(containerLogPath, 0666); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to set termination-log file permissions %q: %v", containerLogPath, err))
}
selinuxRelabel := selinux.SELinuxEnabled()
volumeMounts = append(volumeMounts, &runtimeapi.Mount{
HostPath: containerLogPath,
ContainerPath: container.TerminationMessagePath,
SelinuxRelabel: selinuxRelabel,
})
}
}
return volumeMounts
}
这个方法主就干两件事情:
- 重构一下之前的挂载映射表,但说白了就是吧
Name字段删了,仅此而已; - 调用OS内核接口Create创建一个目录,即
PodContinerDir+cid。创建完之后将这个目录和容器内的终结信息目录映射起来,生成一个新的挂载关系加入挂在表。这个挂载仅用来输出日志。
重构完挂载表之后,正常将其返回即可。
至此为止,generateContainerConfig就算完成了,生成了创建容器需要的相关配置=>containerConfig。然后,startContainer就会把生成的这个配置交给CreateContaier,开始创建容器。
这方法是Docker Serveice提供的方法,创建容器,并完成其中的卷挂载,我们跟进它,看看到底是怎么进行挂载的。
CreateContainer(dockershim包)
该方法位于pkg/kubelet/dockershim/docker_container.go中,源码如下:
// CreateContainer creates a new container in the given PodSandbox
// Docker cannot store the log to an arbitrary location (yet), so we create an
// symlink at LogPath, linking to the actual path of the log.
// TODO: check if the default values returned by the runtime API are ok.
func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
if config == nil {
return "", fmt.Errorf("container config is nil")
}
if sandboxConfig == nil {
return "", fmt.Errorf("sandbox config is nil for container %q", config.Metadata.Name)
}
labels := makeLabels(config.GetLabels(), config.GetAnnotations())
// Apply a the container type label.
labels[containerTypeLabelKey] = containerTypeLabelContainer
// Write the container log path in the labels.
labels[containerLogPathLabelKey] = filepath.Join(sandboxConfig.LogDirectory, config.LogPath)
// Write the sandbox ID in the labels.
labels[sandboxIDLabelKey] = podSandboxID
apiVersion, err := ds.getDockerAPIVersion()
if err != nil {
return "", fmt.Errorf("unable to get the docker API version: %v", err)
}
securityOptSep := getSecurityOptSeparator(apiVersion)
image := ""
if iSpec := config.GetImage(); iSpec != nil {
image = iSpec.Image
}
// 根据传进来了的config生成docker创建容器所需要的新配置=>createConfig
createConfig := dockertypes.ContainerCreateConfig{
Name: makeContainerName(sandboxConfig, config),
Config: &dockercontainer.Config{
// TODO: set User.
Entrypoint: dockerstrslice.StrSlice(config.Command),
Cmd: dockerstrslice.StrSlice(config.Args),
Env: generateEnvList(config.GetEnvs()),
Image: image,
WorkingDir: config.WorkingDir,
Labels: labels,
// Interactive containers:
OpenStdin: config.Stdin,
StdinOnce: config.StdinOnce,
Tty: config.Tty,
// Disable Docker's health check until we officially support it
// (https://github.com/kubernetes/kubernetes/issues/25829).
Healthcheck: &dockercontainer.HealthConfig{
Test: []string{"NONE"},
},
},
HostConfig: &dockercontainer.HostConfig{
// 将之前生成的挂载映射表转换成docker能读懂的格式 => string切片
Binds: generateMountBindings(config.GetMounts()),
},
}
hc := createConfig.HostConfig
ds.updateCreateConfig(&createConfig, config, sandboxConfig, podSandboxID, securityOptSep, apiVersion)
// Set devices for container.
devices := make([]dockercontainer.DeviceMapping, len(config.Devices))
for i, device := range config.Devices {
devices[i] = dockercontainer.DeviceMapping{
PathOnHost: device.HostPath,
PathInContainer: device.ContainerPath,
CgroupPermissions: device.Permissions,
}
}
hc.Resources.Devices = devices
securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSep)
if err != nil {
return "", fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err)
}
hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...)
// 用docker client来创建容器
createResp, err := ds.client.CreateContainer(createConfig)
if err != nil {
createResp, err = recoverFromCreationConflictIfNeeded(ds.client, createConfig, err)
}
if createResp != nil {
return createResp.ID, err
}
return "", err
}
方法中的其他细节这里先不考虑,我们关注其中的核心三步:
- 之前生成的config并不是docker规范的,因此这里要据其构建一个新的
createConfig,这个配置是docker真正能读懂的; - 其中,
createConfig.HostConfig.Binds就是容器要挂载的卷列表。之前makeMounts生成的挂载映射表,其实docker根本读不懂,所以这里要调用generateMountBindings将挂载映射表转换成docker能读懂的格式,是一个string切片。 - 生成完配置之后,调用docker client的
CreateContainer方法,来创建容器。
先看一下generateMountBindings是怎么工作,最后规范的挂载表格式又是什么样的。
generateMountBindings
方法位于pkg/kubelet/dockershim/helpers.go中,源码如下:
// generateMountBindings converts the mount list to a list of strings that
// can be understood by docker.
// '<HostPath>:<ContainerPath>[:options]', where 'options'
// is a comma-separated list of the following strings:
// 'ro', if the path is read only
// 'Z', if the volume requires SELinux relabeling
// propagation mode such as 'rslave'
func generateMountBindings(mounts []*runtimeapi.Mount) []string {
result := make([]string, 0, len(mounts))
for _, m := range mounts {
bind := fmt.Sprintf("%s:%s", m.HostPath, m.ContainerPath)
var attrs []string
if m.Readonly {
attrs = append(attrs, "ro")
}
// Only request relabeling if the pod provides an SELinux context. If the pod
// does not provide an SELinux context relabeling will label the volume with
// the container's randomly allocated MCS label. This would restrict access
// to the volume to the container which mounts it first.
if m.SelinuxRelabel {
attrs = append(attrs, "Z")
}
switch m.Propagation {
case runtimeapi.MountPropagation_PROPAGATION_PRIVATE:
// noop, private is default
case runtimeapi.MountPropagation_PROPAGATION_BIDIRECTIONAL:
attrs = append(attrs, "rshared")
case runtimeapi.MountPropagation_PROPAGATION_HOST_TO_CONTAINER:
attrs = append(attrs, "rslave")
default:
glog.Warningf("unknown propagation mode for hostPath %q", m.HostPath)
// Falls back to "private"
}
if len(attrs) > 0 {
bind = fmt.Sprintf("%s:%s", bind, strings.Join(attrs, ","))
}
result = append(result, bind)
}
return result
}
方法的实现很简单,就是提取主机路径、容器内路径、挂载属性这三个元素。然后docker的规范挂载配置也很简单,就是hostPath:containerPath:attrs 而已。
接着,更进client.CreateContainer来看是如何进行挂载的。
CreateContainer(libdocker包)
方法位于pkg/kubelet/dockershim/libdocker/kube_docker_client.go中,源码如下:
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
opts.HostConfig.ShmSize = defaultShmSize
}
createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &createResp, nil
}
可以看到,这个方法是调用client.ContainerCreate来实现的,因此直接跟进这个方法即可。
ContainerCreate
方法位于github.com/docker/docker/client/container_create.go,源码如下:
// ContainerCreate creates a new container based in the given configuration.
// It can be associated with a name, but it's not mandatory.
func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
var response container.ContainerCreateCreatedBody
if err := cli.NewVersionError("1.25", "stop timeout"); config != nil && config.StopTimeout != nil && err != nil {
return response, err
}
// When using API 1.24 and under, the client is responsible for removing the container
if hostConfig != nil && versions.LessThan(cli.ClientVersion(), "1.25") {
hostConfig.AutoRemove = false
}
query := url.Values{}
if containerName != "" {
query.Set("name", containerName)
}
body := configWrapper{
Config: config,
HostConfig: hostConfig,
NetworkingConfig: networkingConfig,
}
serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
if err != nil {
if serverResp.statusCode == 404 && strings.Contains(err.Error(), "No such image") {
return response, imageNotFoundError{config.Image}
}
return response, err
}
err = json.NewDecoder(serverResp.body).Decode(&response)
ensureReaderClosed(serverResp)
return response, err
}
方法中有一个核心结构变量=>body,是一个请求体的雏形,三个字段大概如下:
- Config:就是createConfig.config,其中包含了容器的大部分配置;
- HostConfig:就是createConfig.HostConif,其中的Binds字段就是之前的挂载映射;
- NetWorkConfig:就是createConfig.NetworkingConfig,容器网络配置。
构建完后,用该请求体去调用docker API结构,通过docker网络的方式来创建容器。
// post sends an http request to the docker API using the method POST with a specific Go context.
serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
继续跟进这个post方法,就是docker如何包装并发送这个请求了。这里面能说能整理的还有很多,所以准备放在一篇博客中单独介绍。
总结
至此,Pod创建过程,有关Container卷挂载的流程就相对清晰了。看上去调用了很多方法,实际上就是层层封装。概括一下的话,就是两大部分:
- 生成容器创建的配置,其中就有卷的挂载映射;
- 将配置包装成请求体,调用docker API进行容器创建;