k8s源码(二): Pod创建过程中Container的卷挂载


​ 直入正题,本文意在整理出Pod创建过程中和Volume挂载有关的源码及其调用过程。这部分的源码解读我并没有找到什么参考资料,所以会不全,也可能有错。

syncPod

直接从syncPod开始看,如上一篇博客(Pod创建流程)中所写,该方法会在创建Pod前进行一些准备工作,比如创建基础目录什么的。方法的最后,它会调用WaitForAttachAndMount等待Pod请求的Volume进行attach和mount,之后,调用SyncPod真正开始创建Pod,源码如下:

// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
//   start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
//   already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // ...
	// Volume manager will not mount volumes for terminated pods
	if !kl.podIsTerminated(pod) {
		// Wait for volumes to attach/mount
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
			glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
			return err
		}
	}
    // ...
	// Call the container runtime's SyncPod callback
	result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
	kl.reasonCache.Update(pod.UID, result)
	if err := result.Error(); err != nil {
		// Do not record an event here, as we keep all event logging for sync pod failures
		// local to container runtime so we get better errors
		return err
	}

	return nil
}

因为要最终Volume挂咋过程,所以进入WaitForAttachAndMount中看看,该方法位于pkg/kubelet/volumemanager/volume_manager.go中。

WaitForAttachAndMount

源码如下:

func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
    // 得到Pod预期要挂载的volume列表
	expectedVolumes := getExpectedVolumes(pod)
	if len(expectedVolumes) == 0 {
		// No volumes to verify
		return nil
	}

	glog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
    // 得到Pod的唯一标识
	uniquePodName := volumehelper.GetUniquePodName(pod)

	// Some pods expect to have Setup called over and over again to update.
	// Remount plugins for which this is true. (Atomically updating volumes,
	// like Downward API, depend on this to update the contents of the volume).
	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
	vm.actualStateOfWorld.MarkRemountRequired(uniquePodName)

    // 阻塞并循环检测,直到Pod预期的卷全部挂载
	err := wait.Poll(
		podAttachAndMountRetryInterval,
		podAttachAndMountTimeout,
		vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))

	if err != nil {
		// Timeout expired
		unmountedVolumes :=
			vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
		if len(unmountedVolumes) == 0 {
			return nil
		}

		return fmt.Errorf(
			"timeout expired waiting for volumes to attach/mount for pod %q/%q. list of unattached/unmounted volumes=%v",
			pod.Namespace,
			pod.Name,
			unmountedVolumes)
	}

	glog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
	return nil
}

这个方法从大体上来看目的很明确,就是一直等到Pod请求挂载的Volume全部挂载完毕,整个等待过程分为四步:

  1. 调用getExpectedVolumes来获得Pod预期挂载的所有卷的卷名,返回一个string切片。
  2. 调用desiredStateOfWorldPopulator.ReprocessPod,将该Pod从ProcessedPod列表中删除,迫使它被重新处理。
  3. 调用actualStateOfWorld.MarkRemountRequired,对已经挂载到该Pod上且需要重新挂载的Volume进行标记。
  4. 使用wait.Poll阻塞进程并持续检测,没测检测都调用verifyVolumesMountedFunc来查看是否该Pod预期挂载的所有Volume都已挂载完毕。

首先是getExpectedVolumes,这个方法体很简单,就是通过遍历pod.Spec.Volumes字段获取全部预期挂载的Volume名而已,这里不细说了。

至于ReprocessPodMarkRemountRequired,这两个方法我并不是很明白,这个涉及到dsw和asw了,我只能试着理解个大概。

首先看ReprocessPod,这个方法位于pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中,源码如下:

func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
	podName volumetypes.UniquePodName) {
	dswp.deleteProcessedPod(podName)
}

// markPodProcessed removes the specified pod from processedPods
func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
	podName volumetypes.UniquePodName) {
	dswp.pods.Lock()
	defer dswp.pods.Unlock()

	delete(dswp.pods.processedPods, podName)
}

可以看到,这个代码实现其实很简单,仅仅是将该Pod从processedPodsMap中删除。且这个Map结构也很简单,是个<string, bool>型,我不知道把Pod从中删除后会发生什么,还有待学习。

然后就是MarkRemountRequired,这个方法就比较复杂了,源码如下:

func (asw *actualStateOfWorld) MarkRemountRequired(
	podName volumetypes.UniquePodName) {
	asw.Lock()
	defer asw.Unlock()
    // attachedVolumes is a map containing the set of volumes the kubelet volume manager believes to be successfully attached to this node
	for volumeName, volumeObj := range asw.attachedVolumes {
		for mountedPodName, podObj := range volumeObj.mountedPods {
			if mountedPodName != podName {
				continue
			}

			volumePlugin, err :=
				asw.volumePluginMgr.FindPluginBySpec(volumeObj.spec)
			if err != nil || volumePlugin == nil {
				// Log and continue processing
				glog.Errorf(
					"MarkRemountRequired failed to FindPluginBySpec for pod %q (podUid %q) volume: %q (volSpecName: %q)",
					podObj.podName,
					podObj.podUID,
					volumeObj.volumeName,
					volumeObj.spec.Name())
				continue
			}

			if volumePlugin.RequiresRemount() {
				podObj.remountRequired = true
				asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
			}
		}
	}
}

虽然看上去比较复杂,但逻辑很简单。首先,从asw.attachedVolumes中获取到已经attach到这个node的Volume列表,然后遍历整个列表。对于列表中的所有Volume,遍历挂载这Volume的所有Pod,如果是当前Pod,则继续操作,反之下一个。

遍历到该Pod挂载的Volume后,通过FindPluginBySpec找到可以支持整个Volume的插件。然后通过插件来判断这个Volume是否需要重挂载,如果需要的话,就加一个标记,把对应字段设为true,仅此而已。我也不知道设为true之后会发生什么。

接着,开始持续调用verifyVolumesMountedFunc来判断是否所有预测Volume都已挂载,来看看它的判断流程:

// verifyVolumesMountedFunc returns a method that returns true when all expected
// volumes are mounted.
func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
	return func() (done bool, err error) {
		return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
	}
}

// getUnmountedVolumes fetches the current list of mounted volumes from
// the actual state of the world, and uses it to process the list of
// expectedVolumes. It returns a list of unmounted volumes.
func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
	mountedVolumes := sets.NewString()
	for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
		mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
	}
	return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
}

这个方法会调用getUnmountedVolumes来得到该Pod预期挂载但还未挂载的Volume数,如果为0才返回true。先看看GetMountedVolumesForPod,源码如下:

func (asw *actualStateOfWorld) GetMountedVolumesForPod(
	podName volumetypes.UniquePodName) []MountedVolume {
	asw.RLock()
	defer asw.RUnlock()
	mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
	for _, volumeObj := range asw.attachedVolumes {
		for mountedPodName, podObj := range volumeObj.mountedPods {
			if mountedPodName == podName {
				mountedVolume = append(
					mountedVolume,
					getMountedVolume(&podObj, &volumeObj))
			}
		}
	}
	return mountedVolume
}

这个代码比较明晰。首先,它加的是一个读锁,说明并不会改变asw。后面的两层for循环逻辑和之前那个一模一样,先通过asw.attachedVolumes得到已经attach到node的所有Volume,在通过podName匹配挂载到该Pod的Volume。只不过,它需要调用getMountedVolume组装成一个新的结构MountedVolume,然后包成切片返回,没什么好说的。

在来看看filterUnmountedVolumes是如何过滤的,其源码如下:

// filterUnmountedVolumes adds each element of expectedVolumes that is not in
// mountedVolumes to a list of unmountedVolumes and returns it.
func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
	unmountedVolumes := []string{}
	for _, expectedVolume := range expectedVolumes {
		if !mountedVolumes.Has(expectedVolume) {
			unmountedVolumes = append(unmountedVolumes, expectedVolume)
		}
	}
	return unmountedVolumes
}

可以看到,逻辑也很简单,还是遍历。遍历expectedVolumes,如果mountedVolumes中不存在这个Volume,就说明该挂载但是还未挂载。

至此,WaitForAttachAndMount也就结束了。在上面的代码中,都只是进行一些检测与判断,并没有进行实际的挂载。也就是说,真正的Volume挂载到Pod的操作,并不在这里完成,换句话说,到这里时宿主机的Volume已经完成了挂载。而实际的挂载过程,应该在volumnManager中进行的,但是这不部分我还没读懂,欠努力。

Volume和Pod如何挂载的我还没弄清,还要学习。接下来,只着眼于Pod中的Container是如何挂载到上述Volume的。

SyncPod

回到syncPod,执行完上述的等待后,调用getPullSecretsForPod将Pod的secrets拉下来,但在这里不重要。重要的是最后要调用的SyncPod,看看它的源码,位于pkg/kubelet/kuberuntime/kuberuntime_manager.go中:

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	// 检擦Pod Spec是否发生改变,如果是,将这些改变记录下来
	podContainerChanges := m.computePodActions(pod, podStatus)
	glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
	if podContainerChanges.CreateSandbox {
		ref, err := ref.GetReference(legacyscheme.Scheme, pod)
		if err != nil {
			glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
		}
		if podContainerChanges.SandboxID != "" {
			m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
		} else {
			glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
		}
	}

	// Step 2: Kill the pod if the sandbox has changed.
	// kill掉 sandbox 已经改变的 pod
	if podContainerChanges.KillPod {
		if !podContainerChanges.CreateSandbox {
			glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
		} else {
			glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
		}

		killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
		result.AddPodSyncResult(killResult)
		if killResult.Error() != nil {
			glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
			return
		}

		if podContainerChanges.CreateSandbox {
			m.purgeInitContainers(pod, podStatus)
		}
	} else {
		// Step 3: kill any running containers in this pod which are not to keep.
		// kill掉ContainersToKill列表中的container
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
			glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
			killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
			result.AddSyncResult(killContainerResult)
			if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
				killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
				glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
				return
			}
		}
	}

	// Keep terminated init containers fairly aggressively controlled
	// This is an optmization because container removals are typically handled
	// by container garbage collector.
	// 删除一些Init Container,减少容器垃圾收集器上的负载
	m.pruneInitContainersBeforeStart(pod, podStatus)

	// We pass the value of the podIP down to generatePodSandboxConfig and
	// generateContainerConfig, which in turn passes it to various other
	// functions, in order to facilitate functionality that requires this
	// value (hosts file and downward API) and avoid races determining
	// the pod IP in cases where a container requires restart but the
	// podIP isn't in the status manager yet.
	//
	// We default to the IP in the passed-in pod status, and overwrite it if the
	// sandbox needs to be (re)started.
	podIP := ""
	if podStatus != nil {
		podIP = podStatus.IP
	}

	// Step 4: Create a sandbox for the pod if necessary.
	// 为pod创建sandbox,如果需要的话
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		var msg string
		var err error

		glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
		createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
		result.AddSyncResult(createSandboxResult)
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
		if err != nil {
			createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
			glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
			ref, err := ref.GetReference(legacyscheme.Scheme, pod)
			if err != nil {
				glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
			}
			m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox.")
			return
		}
		glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

		podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
		if err != nil {
			ref, err := ref.GetReference(legacyscheme.Scheme, pod)
			if err != nil {
				glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
			}
			m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
			glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
			result.Fail(err)
			return
		}

		// If we ever allow updating a pod from non-host-network to
		// host-network, we may use a stale IP.
		if !kubecontainer.IsHostNetworkPod(pod) {
			// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
			podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
			glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
		}
	}

	// Get podSandboxConfig for containers to start.
	configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
	result.AddSyncResult(configPodSandboxResult)
	//生成Sandbox的config配置,如pod的DNS、hostName、端口映射
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		glog.Error(message)
		configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
		return
	}

	// Step 5: start the init container.
	// 启动初始化容器
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
		// Start the next init container.
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
		result.AddSyncResult(startContainerResult)
		isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
			return
		}

		glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
		// 启动
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
			startContainerResult.Fail(err, msg)
			utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
			return
		}

		// Successfully started the container; clear the entry in the failure
		glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
	}

	// Step 6: start containers in podContainerChanges.ContainersToStart.
	// 启动主容器
	for _, idx := range podContainerChanges.ContainersToStart {
		container := &pod.Spec.Containers[idx]
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
		result.AddSyncResult(startContainerResult)

		isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
			continue
		}

		glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
		// 启动
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
			startContainerResult.Fail(err, msg)
			// known errors that are logged in other places are logged at higher levels here to avoid
			// repetitive log spam
			switch {
			case err == images.ErrImagePullBackOff:
				glog.V(3).Infof("container start failed: %v: %s", err, msg)
			default:
				utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
			}
			continue
		}
	}

	return
}

这个方法之前也说过,首先会调用computePodActions计算一下有哪些Pod Spec中的变化,实际上就是一些是否需要执行动作。将这些变换记录在podContainerChanges中,是一个podActions结构:

// podActions keeps information what to do for a pod.
type podActions struct {
	KillPod bool
	CreateSandbox bool
	SandboxID string
	Attempt uint32
	NextInitContainerToStart *v1.Container
	ContainersToStart []int
	ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
}

调用完后就可以看见,下面再疯狂的if-else,因为要根据podActions来选取对应的动作。因为我们要创建Pod,所以只专注于ContainersToStart部分:

// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
	container := &pod.Spec.Containers[idx]
	startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
	result.AddSyncResult(startContainerResult)

	isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
	if isInBackOff {
		startContainerResult.Fail(err, msg)
		glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
		continue
	}

	glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
	if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
		startContainerResult.Fail(err, msg)
		// known errors that are logged in other places are logged at higher levels here to avoid
		// repetitive log spam
		switch {
		case err == images.ErrImagePullBackOff:
			glog.V(3).Infof("container start failed: %v: %s", err, msg)
		default:
			utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
		}
		continue
	}
}

可以看到,其中最最核心的方法就是startContainer,这个方法会首先对容器生成一些配置,然后在启动该容器,我们要研究的Volume挂载过程就在里面。

startContainer

进入这个方法看一下,它位于pkg/kubelet/kuberuntime/kuberuntime_container.go中:

// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
	if err != nil {
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
		return msg, err
	}

	// Step 2: create the container.
	ref, err := kubecontainer.GenerateContainerRef(pod, container)
	if err != nil {
		glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
	}
	glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)

	// For a new container, the RestartCount should be 0
	restartCount := 0
	containerStatus := podStatus.FindContainerStatusByName(container.Name)
	if containerStatus != nil {
		restartCount = containerStatus.RestartCount + 1
	}

	containerConfig, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
	if err != nil {
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), ErrCreateContainerConfig
	}
	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
	if err != nil {
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), ErrCreateContainer
	}
	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
	if err != nil {
		m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", err)
		return "Internal PreStartContainer hook failed", err
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")

	if ref != nil {
		m.containerRefManager.SetRef(kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}, ref)
	}

	// Step 3: start the container.
	err = m.runtimeService.StartContainer(containerID)
	if err != nil {
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")

	// Symlink container logs to the legacy container log location for cluster logging
	// support.
	// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
	containerMeta := containerConfig.GetMetadata()
	sandboxMeta := podSandboxConfig.GetMetadata()
	legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
		sandboxMeta.Namespace)
	containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
	if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
		glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
			legacySymlink, containerID, containerLog, err)
	}

	// Step 4: execute the post start hook.
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		kubeContainerID := kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}
		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
		if handlerErr != nil {
			m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
			if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
				glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
					container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
			}
			return msg, ErrPostStartHook
		}
	}

	return "", nil
}

这个方法大致分为三个部分:

  1. 拉取容器镜像;
  2. 创建容器;
  3. 启动这个容器。

其中,最核心的就是第二步创建容器。这里只看核心步骤它会先调用generateContainerConfig生成容器的配置,然后将该配置传给CreateContainer来创建容器。

来一个个看。

generateContainerConfig

方法位于pkg/kubelet/kuberuntime/kuberuntime_container.go中,源码如下:

// generateContainerConfig generates container config for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) generateContainerConfig(container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string) (*runtimeapi.ContainerConfig, error) {
	opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, container, podIP)
	if err != nil {
		return nil, err
	}

	uid, username, err := m.getImageUser(container.Image)
	if err != nil {
		return nil, err
	}

	// Verify RunAsNonRoot. Non-root verification only supports numeric user.
	if err := verifyRunAsNonRoot(pod, container, uid, username); err != nil {
		return nil, err
	}

	command, args := kubecontainer.ExpandContainerCommandAndArgs(container, opts.Envs)
	containerLogsPath := buildContainerLogsPath(container.Name, restartCount)
	restartCountUint32 := uint32(restartCount)
	config := &runtimeapi.ContainerConfig{
		Metadata: &runtimeapi.ContainerMetadata{
			Name:    container.Name,
			Attempt: restartCountUint32,
		},
		Image:       &runtimeapi.ImageSpec{Image: imageRef},
		Command:     command,
		Args:        args,
		WorkingDir:  container.WorkingDir,
		Labels:      newContainerLabels(container, pod),
		Annotations: newContainerAnnotations(container, pod, restartCount),
		Devices:     makeDevices(opts),
		Mounts:      m.makeMounts(opts, container),
		LogPath:     containerLogsPath,
		Stdin:       container.Stdin,
		StdinOnce:   container.StdinOnce,
		Tty:         container.TTY,
		Linux:       m.generateLinuxContainerConfig(container, pod, uid, username),
	}

	// set environment variables
	envs := make([]*runtimeapi.KeyValue, len(opts.Envs))
	for idx := range opts.Envs {
		e := opts.Envs[idx]
		envs[idx] = &runtimeapi.KeyValue{
			Key:   e.Name,
			Value: e.Value,
		}
	}
	config.Envs = envs

	return config, nil
}

方法会先调用GenerateRunContainerOptions生成一个opts,是一个RunContainerOptions结构,该结构字段如下:

type RunContainerOptions struct {
    Envs                    []EnvVar
    Mounts                  []Mount
    Devices                 []DeviceInfo
    PortMappings            []PortMapping
    PodContainerDir         string
    CgroupParent            string
    ReadOnly                bool
    Hostname                string
    EnableHostUserNamespace bool
}

其中就有我们要的更挂载相关的字段Mounts。生成完opts之后,会调用getImageUser得到用户。接着,调用ExpandContainerCommandAndArgs来得到容器命令和参数,调用buildContainerLogsPath生成日志文件路径。最后,通过这些参数创建一个ContainerConfig类型的结构变量,就是容器配置。

我们要关注该结构中的Mounts字段,该字段决定了容器创建时的Volume挂载。通过m.makeMounts(opts, container)生成挂载映射表,赋值给它。

先来看看opts是怎么生成了,然后在回头看这个m.makeMounts

GenerateRunContainerOptions

方法位于pkg/kubelet/kubelet_pods.go中,源码如下:

// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
	// 创建一个空的opts(RunContainerOptions)
	opts, err := kl.containerManager.GetResources(pod, container)
	if err != nil {
		return nil, err
	}

	// 得到Pod的cgroup parent
	cgroupParent := kl.GetPodCgroupParent(pod)
	opts.CgroupParent = cgroupParent
	// 为Pod创建hostname和hostDomainName
	hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
	if err != nil {
		return nil, err
	}
	opts.Hostname = hostname
	// podName为唯一标识符
	podName := volumehelper.GetUniquePodName(pod)
	// 获取pod挂载的Volume
	volumes := kl.volumeManager.GetMountedVolumesForPod(podName)
	// 构建容器的端口映射表
	opts.PortMappings = kubecontainer.MakePortMappings(container)
	// TODO(random-liu): Move following convert functions into pkg/kubelet/container
	devices, err := kl.makeGPUDevices(pod, container)
	if err != nil {
		return nil, err
	}
	opts.Devices = append(opts.Devices, devices...)

	// TODO: remove feature gate check after no longer needed
	if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
		blkutil := volumeutil.NewBlockVolumePathHandler()
		blkVolumes, err := kl.makeBlockVolumes(pod, container, volumes, blkutil)
		if err != nil {
			return nil, err
		}
		opts.Devices = append(opts.Devices, blkVolumes...)
	}

	// 生成挂载映射表1
	mounts, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
	if err != nil {
		return nil, err
	}
	opts.Mounts = append(opts.Mounts, mounts...)

	// 生成环境变量表
	envs, err := kl.makeEnvironmentVariables(pod, container, podIP)
	if err != nil {
		return nil, err
	}
	opts.Envs = append(opts.Envs, envs...)

	// Disabling adding TerminationMessagePath on Windows as these files would be mounted as docker volume and
	// Docker for Windows has a bug where only directories can be mounted
	if len(container.TerminationMessagePath) != 0 && runtime.GOOS != "windows" {
		// 创建容器路径
		p := kl.getPodContainerDir(pod.UID, container.Name)
		if err := os.MkdirAll(p, 0750); err != nil {
			glog.Errorf("Error on creating %q: %v", p, err)
		} else {
			opts.PodContainerDir = p
		}
	}

	// only do this check if the experimental behavior is enabled, otherwise allow it to default to false
	// 如果有需求, 允许Pod内的容器共享宿主机的namespace
	if kl.experimentalHostUserNamespaceDefaulting {
		opts.EnableHostUserNamespace = kl.enableHostUserNamespace(pod)
	}

	return opts, nil
}

可以看到,该方法会逐步生成需要指定的各种Options字段,流程如下:

  1. 调用GetResources创建一个空的opts(RunContainerOptions结构变量),进入这个方法会发现它仅仅返回一个空的结构指针;
  2. 调用GetPodCgroupParent来获取Pod的cgroup parent;
  3. 调用GeneratePodHostNameAndDomain来为Pod创建hostname,hostDomainName;
  4. 调用GetUniquePodName得到该Pod的唯一标识符;
  5. 调用GetMountedVolumesForPod来获取Pod挂载的Volume。注意,这个方法是位于volumeManager中的,但是它内部是通过调用之前那个asw.GetMountedVolumesForPod来获取相关信息的,这里不再展开;
  6. 调用MakePortMappings来构建容器的端口映射表;
  7. 进行一些设备相关的配置;
  8. 调用kubelet包中的makeMounts来初步生成容器的的挂载映射表;
  9. 调用makeEnvironmentVariables来生成环境变量表;
  10. 创建容器在Pod所处目录下的路径,这一步就是简单的路径拼接和目录创建;
  11. 如果需要的话,允许Pod内的容器共享宿主机的namespace;

其中,跟容器内卷挂载配置相关的操作就是第8步,这里深入makeMounts看看挂载表是如何生成的。

makeMounts(kublet包)

该方法位于pkg/kubelet/kubelet_pods.go中,源码如下:

// 决定容器的挂载点
// makeMounts determines the mount points for the given container.
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
	// Kubernetes only mounts on /etc/hosts if:
	// - container is not an infrastructure (pause) container
	// - container is not already mounting on /etc/hosts
	// - OS is not Windows
	// Kubernetes will not mount /etc/hosts if:
	// - when the Pod sandbox is being created, its IP is still unknown. Hence, PodIP will not have been set.
	mountEtcHostsFile := len(podIP) > 0 && runtime.GOOS != "windows"
	glog.V(3).Infof("container: %v/%v/%v podIP: %q creating hosts mount: %v", pod.Namespace, pod.Name, container.Name, podIP, mountEtcHostsFile)
	// 创建一个空的挂载映射表
	mounts := []kubecontainer.Mount{}
	for _, mount := range container.VolumeMounts {
		// do not mount /etc/hosts if container is already mounting on the path
		mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
		vol, ok := podVolumes[mount.Name]
		if !ok || vol.Mounter == nil {
			glog.Errorf("Mount cannot be satisfied for container %q, because the volume is missing or the volume mounter is nil: %+v", container.Name, mount)
			return nil, fmt.Errorf("cannot find volume %q to mount into container %q", mount.Name, container.Name)
		}

		relabelVolume := false
		// If the volume supports SELinux and it has not been
		// relabeled already and it is not a read-only volume,
		// relabel it and mark it as labeled
		if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux && !vol.SELinuxLabeled {
			vol.SELinuxLabeled = true
			relabelVolume = true
		}
		hostPath, err := volume.GetPath(vol.Mounter)
		if err != nil {
			return nil, err
		}
		if mount.SubPath != "" {
			if filepath.IsAbs(mount.SubPath) {
				return nil, fmt.Errorf("error SubPath `%s` must not be an absolute path", mount.SubPath)
			}

			err = volumevalidation.ValidatePathNoBacksteps(mount.SubPath)
			if err != nil {
				return nil, fmt.Errorf("unable to provision SubPath `%s`: %v", mount.SubPath, err)
			}

			fileinfo, err := os.Lstat(hostPath)
			if err != nil {
				return nil, err
			}
			perm := fileinfo.Mode()

			hostPath = filepath.Join(hostPath, mount.SubPath)

			if subPathExists, err := utilfile.FileOrSymlinkExists(hostPath); err != nil {
				glog.Errorf("Could not determine if subPath %s exists; will not attempt to change its permissions", hostPath)
			} else if !subPathExists {
				// Create the sub path now because if it's auto-created later when referenced, it may have an
				// incorrect ownership and mode. For example, the sub path directory must have at least g+rwx
				// when the pod specifies an fsGroup, and if the directory is not created here, Docker will
				// later auto-create it with the incorrect mode 0750
				if err := os.MkdirAll(hostPath, perm); err != nil {
					glog.Errorf("failed to mkdir:%s", hostPath)
					return nil, err
				}

				// chmod the sub path because umask may have prevented us from making the sub path with the same
				// permissions as the mounter path
				if err := os.Chmod(hostPath, perm); err != nil {
					return nil, err
				}
			}
		}

		// Docker Volume Mounts fail on Windows if it is not of the form C:/
		containerPath := mount.MountPath
		if runtime.GOOS == "windows" {
			if (strings.HasPrefix(hostPath, "/") || strings.HasPrefix(hostPath, "\\")) && !strings.Contains(hostPath, ":") {
				hostPath = "c:" + hostPath
			}
		}
		if !filepath.IsAbs(containerPath) {
			containerPath = makeAbsolutePath(runtime.GOOS, containerPath)
		}

		propagation, err := translateMountPropagation(mount.MountPropagation)
		if err != nil {
			return nil, err
		}
		glog.V(5).Infof("Pod %q container %q mount %q has propagation %q", format.Pod(pod), container.Name, mount.Name, propagation)

		mounts = append(mounts, kubecontainer.Mount{
			Name:           mount.Name,
			ContainerPath:  containerPath,
			HostPath:       hostPath,
			ReadOnly:       mount.ReadOnly,
			SELinuxRelabel: relabelVolume,
			Propagation:    propagation,
		})
	}
	if mountEtcHostsFile {
		hostAliases := pod.Spec.HostAliases
		hostsMount, err := makeHostsMount(podDir, podIP, hostName, hostDomain, hostAliases, pod.Spec.HostNetwork)
		if err != nil {
			return nil, err
		}
		mounts = append(mounts, *hostsMount)
	}
	return mounts, nil
}

在分析该源码之前,先看下VolumMount这个结构体,即Pod.Spec.Container.VolumeMount,是Pod中容器的卷挂载请求信息,如下:

// VolumeMount describes a mounting of a Volume within a container.
type VolumeMount struct {
	// This must match the Name of a Volume.
	Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
	// Mounted read-only if true, read-write otherwise (false or unspecified).
	// Defaults to false.
	// +optional
	ReadOnly bool `json:"readOnly,omitempty" protobuf:"varint,2,opt,name=readOnly"`
	// Path within the container at which the volume should be mounted.  Must
	// not contain ':'.
	MountPath string `json:"mountPath" protobuf:"bytes,3,opt,name=mountPath"`
	// Path within the volume from which the container's volume should be mounted.
	// Defaults to "" (volume's root).
	// +optional
	SubPath string `json:"subPath,omitempty" protobuf:"bytes,4,opt,name=subPath"`
	// mountPropagation determines how mounts are propagated from the host
	// to container and the other way around.
	// When not set, MountPropagationHostToContainer is used.
	// This field is alpha in 1.8 and can be reworked or removed in a future
	// release.
	// +optional
	MountPropagation *MountPropagationMode `json:"mountPropagation,omitempty" protobuf:"bytes,5,opt,name=mountPropagation,casttype=MountPropagationMode"`
}

其中,Name字段为容器要挂载的Volume的卷名,ReadOnly字段指是否只读。MountPath是容器内的挂载点,就像注释上说的,within the containerSubPath是相对于Volume的根路径,是宿主机上的一个子路径,在默认情况下为空,代表容器直接挂载到Volume的根目录下。

回到方法,可以看出其主要流程如下:

  1. 创建一个空的挂载映射表mounts
  2. 对容器请求的每一个挂载映射(即上述VolumeMount),执行下述操作:
  3. podVolumes中得到要挂载的Volume的具体信息;
  4. 调用GetPath得到该Volume的根路径,即hostPath
  5. hostPathgsubPath进行一些必要的检验,然后拼接起来成为新的挂载路径。具体的检验操作和拼接操作以及相关的漏洞参见之前的一篇博客:CVE-2017
  6. 获得外部挂载路径后,开始获取容器内要挂载的路径。通过一些简单的路径操作,得到容器内待挂载的绝对路径containerPath
  7. 根据VolumeName、拼接了subPath的hostPath、容器内路径containerPath、是否只读选项readOnly以及另外两个没有提到的属性,构建一个完整的挂载映射关系,并把它追加到mounts中。

处理完所有VolumeMount之后,该容器的挂载映射表就生成完毕,方法正常返回。通过上面的分析看出,该方法核心就是生成内外路径的映射关系和一些属性,将其作为配置信息传出去,并没有真正深入OS内核执行挂载。

makeMount(kuberuntime包)

生成完opts后,即可更具其构建最终的配置config,即:

config := &runtimeapi.ContainerConfig{
	Metadata: &runtimeapi.ContainerMetadata{
		Name:    container.Name,
		Attempt: restartCountUint32,
	},
	Image:       &runtimeapi.ImageSpec{Image: imageRef},
	Command:     command,
	Args:        args,
	WorkingDir:  container.WorkingDir,
	Labels:      newContainerLabels(container, pod),
	Annotations: newContainerAnnotations(container, pod, restartCount),
	Devices:     makeDevices(opts),
	Mounts:      m.makeMounts(opts, container),
	LogPath:     containerLogsPath,
	Stdin:       container.Stdin,
	StdinOnce:   container.StdinOnce,
	Tty:         container.TTY,
	Linux:       m.generateLinuxContainerConfig(container, pod, uid, username),
}

可以看到,config.Mounts并不是单单引用了上面讲的opts.Mounts,而是又做了一遍操作。也就是说,之前的那个挂载映射表只是个初步的,这里还要改一下。

看一下这个makeMounts,位于pkg/kubelet/kuberuntime/kuberuntime_container.go中,源码如下:

// makeMounts generates container volume mounts for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerOptions, container *v1.Container) []*runtimeapi.Mount {
	volumeMounts := []*runtimeapi.Mount{}

	for idx := range opts.Mounts {
		v := opts.Mounts[idx]
		selinuxRelabel := v.SELinuxRelabel && selinux.SELinuxEnabled()
		mount := &runtimeapi.Mount{
			HostPath:       v.HostPath,
			ContainerPath:  v.ContainerPath,
			Readonly:       v.ReadOnly,
			SelinuxRelabel: selinuxRelabel,
			Propagation:    v.Propagation,
		}

		volumeMounts = append(volumeMounts, mount)
	}

	// The reason we create and mount the log file in here (not in kubelet) is because
	// the file's location depends on the ID of the container, and we need to create and
	// mount the file before actually starting the container.
	if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 {
		// Because the PodContainerDir contains pod uid and container name which is unique enough,
		// here we just add a random id to make the path unique for different instances
		// of the same container.
		cid := makeUID()
		containerLogPath := filepath.Join(opts.PodContainerDir, cid)
		fs, err := m.osInterface.Create(containerLogPath)
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("error on creating termination-log file %q: %v", containerLogPath, err))
		} else {
			fs.Close()

			// Chmod is needed because ioutil.WriteFile() ends up calling
			// open(2) to create the file, so the final mode used is "mode &
			// ~umask". But we want to make sure the specified mode is used
			// in the file no matter what the umask is.
			if err := m.osInterface.Chmod(containerLogPath, 0666); err != nil {
				utilruntime.HandleError(fmt.Errorf("unable to set termination-log file permissions %q: %v", containerLogPath, err))
			}

			selinuxRelabel := selinux.SELinuxEnabled()
			volumeMounts = append(volumeMounts, &runtimeapi.Mount{
				HostPath:       containerLogPath,
				ContainerPath:  container.TerminationMessagePath,
				SelinuxRelabel: selinuxRelabel,
			})
		}
	}

	return volumeMounts
}

这个方法主就干两件事情:

  1. 重构一下之前的挂载映射表,但说白了就是吧Name字段删了,仅此而已;
  2. 调用OS内核接口Create创建一个目录,即PodContinerDir+cid。创建完之后将这个目录和容器内的终结信息目录映射起来,生成一个新的挂载关系加入挂在表。这个挂载仅用来输出日志。

重构完挂载表之后,正常将其返回即可。

至此为止,generateContainerConfig就算完成了,生成了创建容器需要的相关配置=>containerConfig。然后,startContainer就会把生成的这个配置交给CreateContaier,开始创建容器。

这方法是Docker Serveice提供的方法,创建容器,并完成其中的卷挂载,我们跟进它,看看到底是怎么进行挂载的。

CreateContainer(dockershim包)

该方法位于pkg/kubelet/dockershim/docker_container.go中,源码如下:

// CreateContainer creates a new container in the given PodSandbox
// Docker cannot store the log to an arbitrary location (yet), so we create an
// symlink at LogPath, linking to the actual path of the log.
// TODO: check if the default values returned by the runtime API are ok.
func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
	if config == nil {
		return "", fmt.Errorf("container config is nil")
	}
	if sandboxConfig == nil {
		return "", fmt.Errorf("sandbox config is nil for container %q", config.Metadata.Name)
	}

	labels := makeLabels(config.GetLabels(), config.GetAnnotations())
	// Apply a the container type label.
	labels[containerTypeLabelKey] = containerTypeLabelContainer
	// Write the container log path in the labels.
	labels[containerLogPathLabelKey] = filepath.Join(sandboxConfig.LogDirectory, config.LogPath)
	// Write the sandbox ID in the labels.
	labels[sandboxIDLabelKey] = podSandboxID

	apiVersion, err := ds.getDockerAPIVersion()
	if err != nil {
		return "", fmt.Errorf("unable to get the docker API version: %v", err)
	}
	securityOptSep := getSecurityOptSeparator(apiVersion)

	image := ""
	if iSpec := config.GetImage(); iSpec != nil {
		image = iSpec.Image
	}
	// 根据传进来了的config生成docker创建容器所需要的新配置=>createConfig
	createConfig := dockertypes.ContainerCreateConfig{
		Name: makeContainerName(sandboxConfig, config),
		Config: &dockercontainer.Config{
			// TODO: set User.
			Entrypoint: dockerstrslice.StrSlice(config.Command),
			Cmd:        dockerstrslice.StrSlice(config.Args),
			Env:        generateEnvList(config.GetEnvs()),
			Image:      image,
			WorkingDir: config.WorkingDir,
			Labels:     labels,
			// Interactive containers:
			OpenStdin: config.Stdin,
			StdinOnce: config.StdinOnce,
			Tty:       config.Tty,
			// Disable Docker's health check until we officially support it
			// (https://github.com/kubernetes/kubernetes/issues/25829).
			Healthcheck: &dockercontainer.HealthConfig{
				Test: []string{"NONE"},
			},
		},
		HostConfig: &dockercontainer.HostConfig{
			// 将之前生成的挂载映射表转换成docker能读懂的格式 => string切片
			Binds: generateMountBindings(config.GetMounts()),
		},
	}

	hc := createConfig.HostConfig
	ds.updateCreateConfig(&createConfig, config, sandboxConfig, podSandboxID, securityOptSep, apiVersion)
	// Set devices for container.
	devices := make([]dockercontainer.DeviceMapping, len(config.Devices))
	for i, device := range config.Devices {
		devices[i] = dockercontainer.DeviceMapping{
			PathOnHost:        device.HostPath,
			PathInContainer:   device.ContainerPath,
			CgroupPermissions: device.Permissions,
		}
	}
	hc.Resources.Devices = devices

	securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSep)
	if err != nil {
		return "", fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err)
	}

	hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...)

	// 用docker client来创建容器
	createResp, err := ds.client.CreateContainer(createConfig)
	if err != nil {
		createResp, err = recoverFromCreationConflictIfNeeded(ds.client, createConfig, err)
	}

	if createResp != nil {
		return createResp.ID, err
	}
	return "", err
}

方法中的其他细节这里先不考虑,我们关注其中的核心三步:

  1. 之前生成的config并不是docker规范的,因此这里要据其构建一个新的createConfig,这个配置是docker真正能读懂的;
  2. 其中,createConfig.HostConfig.Binds就是容器要挂载的卷列表。之前makeMounts生成的挂载映射表,其实docker根本读不懂,所以这里要调用generateMountBindings将挂载映射表转换成docker能读懂的格式,是一个string切片。
  3. 生成完配置之后,调用docker client的CreateContainer方法,来创建容器。

先看一下generateMountBindings是怎么工作,最后规范的挂载表格式又是什么样的。

generateMountBindings

方法位于pkg/kubelet/dockershim/helpers.go中,源码如下:

// generateMountBindings converts the mount list to a list of strings that
// can be understood by docker.
// '<HostPath>:<ContainerPath>[:options]', where 'options'
// is a comma-separated list of the following strings:
// 'ro', if the path is read only
// 'Z', if the volume requires SELinux relabeling
// propagation mode such as 'rslave'
func generateMountBindings(mounts []*runtimeapi.Mount) []string {
	result := make([]string, 0, len(mounts))
	for _, m := range mounts {
		bind := fmt.Sprintf("%s:%s", m.HostPath, m.ContainerPath)
		var attrs []string
		if m.Readonly {
			attrs = append(attrs, "ro")
		}
		// Only request relabeling if the pod provides an SELinux context. If the pod
		// does not provide an SELinux context relabeling will label the volume with
		// the container's randomly allocated MCS label. This would restrict access
		// to the volume to the container which mounts it first.
		if m.SelinuxRelabel {
			attrs = append(attrs, "Z")
		}
		switch m.Propagation {
		case runtimeapi.MountPropagation_PROPAGATION_PRIVATE:
			// noop, private is default
		case runtimeapi.MountPropagation_PROPAGATION_BIDIRECTIONAL:
			attrs = append(attrs, "rshared")
		case runtimeapi.MountPropagation_PROPAGATION_HOST_TO_CONTAINER:
			attrs = append(attrs, "rslave")
		default:
			glog.Warningf("unknown propagation mode for hostPath %q", m.HostPath)
			// Falls back to "private"
		}

		if len(attrs) > 0 {
			bind = fmt.Sprintf("%s:%s", bind, strings.Join(attrs, ","))
		}
		result = append(result, bind)
	}
	return result
}

方法的实现很简单,就是提取主机路径、容器内路径、挂载属性这三个元素。然后docker的规范挂载配置也很简单,就是hostPath:containerPath:attrs 而已。

接着,更进client.CreateContainer来看是如何进行挂载的。

CreateContainer(libdocker包)

方法位于pkg/kubelet/dockershim/libdocker/kube_docker_client.go中,源码如下:

func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
	ctx, cancel := d.getTimeoutContext()
	defer cancel()
	// we provide an explicit default shm size as to not depend on docker daemon.
	// TODO: evaluate exposing this as a knob in the API
	if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
		opts.HostConfig.ShmSize = defaultShmSize
	}
	createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
	if ctxErr := contextError(ctx); ctxErr != nil {
		return nil, ctxErr
	}
	if err != nil {
		return nil, err
	}
	return &createResp, nil
}

可以看到,这个方法是调用client.ContainerCreate来实现的,因此直接跟进这个方法即可。

ContainerCreate

方法位于github.com/docker/docker/client/container_create.go,源码如下:

// ContainerCreate creates a new container based in the given configuration.
// It can be associated with a name, but it's not mandatory.
func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
	var response container.ContainerCreateCreatedBody

	if err := cli.NewVersionError("1.25", "stop timeout"); config != nil && config.StopTimeout != nil && err != nil {
		return response, err
	}

	// When using API 1.24 and under, the client is responsible for removing the container
	if hostConfig != nil && versions.LessThan(cli.ClientVersion(), "1.25") {
		hostConfig.AutoRemove = false
	}

	query := url.Values{}
	if containerName != "" {
		query.Set("name", containerName)
	}

	body := configWrapper{
		Config:           config,
		HostConfig:       hostConfig,
		NetworkingConfig: networkingConfig,
	}

	serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
	if err != nil {
		if serverResp.statusCode == 404 && strings.Contains(err.Error(), "No such image") {
			return response, imageNotFoundError{config.Image}
		}
		return response, err
	}

	err = json.NewDecoder(serverResp.body).Decode(&response)
	ensureReaderClosed(serverResp)
	return response, err
}

方法中有一个核心结构变量=>body,是一个请求体的雏形,三个字段大概如下:

  • Config:就是createConfig.config,其中包含了容器的大部分配置;
  • HostConfig:就是createConfig.HostConif,其中的Binds字段就是之前的挂载映射;
  • NetWorkConfig:就是createConfig.NetworkingConfig,容器网络配置。

构建完后,用该请求体去调用docker API结构,通过docker网络的方式来创建容器。

// post sends an http request to the docker API using the method POST with a specific Go context.
serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)

继续跟进这个post方法,就是docker如何包装并发送这个请求了。这里面能说能整理的还有很多,所以准备放在一篇博客中单独介绍。

总结

至此,Pod创建过程,有关Container卷挂载的流程就相对清晰了。看上去调用了很多方法,实际上就是层层封装。概括一下的话,就是两大部分:

  • 生成容器创建的配置,其中就有卷的挂载映射;
  • 将配置包装成请求体,调用docker API进行容器创建;

文章作者: SrcMiLe
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 SrcMiLe !
评论
  目录