本篇文章給大家分享的是有關(guān)如何理解kubernetes數(shù)據(jù)卷管理的源碼,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
目前創(chuàng)新互聯(lián)建站已為上1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬主機(jī)、網(wǎng)站改版維護(hù)、企業(yè)網(wǎng)站設(shè)計、魯?shù)榫W(wǎng)站維護(hù)等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
volume是k8s中很重要的一個環(huán)節(jié),主要用來存儲k8s中pod生產(chǎn)的一些系統(tǒng)或者業(yè)務(wù)數(shù)據(jù)。k8s在kubelet中提供了volume管理的邏輯
首先是kubelet啟動方法
func main() { s := options.NewKubeletServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s, nil); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }
很容易發(fā)現(xiàn)run方法中包含了kubelet所有重要信息
func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { //配置驗證 ... if kubeDeps == nil { ... kubeDeps, err = UnsecuredKubeletDeps(s) ... } //初始化cAdvisor以及containerManager等管理器 ... if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil { return err } ... }
里面有兩個與volume管理相關(guān)的重要方法
UnsecuredKubeletDeps:會初始化docker client、網(wǎng)絡(luò)管理插件、數(shù)據(jù)管理插件等系統(tǒng)核心組件,因為不方便對外部開放,所以命名為unsecure。其中我們需要關(guān)注的是它對volume plugin的初始化操作
func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) { ... return &kubelet.KubeletDeps{ Auth: nil, CAdvisorInterface: nil, Cloud: nil, ContainerManager: nil, DockerClient: dockerClient, KubeClient: nil, ExternalKubeClient: nil, Mounter: mounter, NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir), OOMAdjuster: oom.NewOOMAdjuster(), OSInterface: kubecontainer.RealOS{}, Writer: writer, VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), TLSOptions: tlsOptions, }, nil }
在初始化volume plugin的時候會傳遞VolumePluginDir作為自定義plugin的路徑,默認(rèn)路徑為**/usr/libexec/kubernetes/kubelet-plugins/volume/exec/**
func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { allPlugins := []volume.VolumePlugin{} allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...) allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...) allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...) allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(volume.VolumeConfig{})...) allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...) allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...) allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...) allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...) allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...) allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...) allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(pluginDir)...) allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...) allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...) allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...) allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...) allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...) allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) return allPlugins }
可以觀察到眾多插件中,有一個名為flexvolume,只有這個插件帶有參數(shù)pluginDir,說明只有這個插件支持自定義實現(xiàn)。具體kubelet怎么和這些插件交互,以及這些插件提供哪些接口,還需要繼續(xù)閱讀代碼
RunKubelet:這才是kubelet服務(wù)的啟動方法,其中最重要的功能都藏在startKubelet中
func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error { //初始化啟動器 ... if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } glog.Infof("Started kubelet %s as runonce", version.Get().String()) } else { startKubelet(k, podCfg, kubeCfg, kubeDeps) glog.Infof("Started kubelet %s", version.Get().String()) } return nil }
startKubelet包含兩個環(huán)節(jié)
func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) { // 同步pod信息 go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) // 啟動kubelet服務(wù) if kubeCfg.EnableServer { go wait.Until(func() { k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling) }, 0, wait.NeverStop) } if kubeCfg.ReadOnlyPort > 0 { go wait.Until(func() { k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) }, 0, wait.NeverStop) } }
跟蹤同步pod信息的Run方法,會追查到這段代碼
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { ... go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { //同步node信息 go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) } // 同步pod信息 kl.pleg.Start() kl.syncLoop(updates, kl) }
kl.volumeManager是kubelet進(jìn)行數(shù)據(jù)卷管理的核心接口
type VolumeManager interface { Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) WaitForAttachAndMount(pod *v1.Pod) error GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 GetVolumesInUse() []v1.UniqueVolumeName ReconcilerStatesHasBeenSynced() bool VolumeIsAttached(volumeName v1.UniqueVolumeName) bool MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName) }
VolumeManager的Run會執(zhí)行一個異步循環(huán),當(dāng)pod被調(diào)度到該node,它會檢查該pod所申請的所有volume,根據(jù)這些volume與pod的關(guān)系做attach/detach/mount/unmount操作
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { defer runtime.HandleCrash() go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh) glog.V(2).Infof("The desired_state_of_world populator starts") glog.Infof("Starting Kubelet Volume Manager") go vm.reconciler.Run(stopCh) <-stopCh glog.Infof("Shutting down Kubelet Volume Manager") }
其中重點(diǎn)關(guān)注的地方是vm.desiredStateOfWorldPopulator.Run和vm.reconciler.Run這兩個方法。在介紹這兩個方法之前,需要補(bǔ)充一個關(guān)鍵信息,這也是理解這兩個方法的關(guān)鍵信息。
kubelet管理volume的方式基于兩個不同的狀態(tài):
理解了這兩個狀態(tài),就能大概知道vm.desiredStateOfWorldPopulator.Run這個方法是干什么的呢。很明顯,它就是根據(jù)從apiserver同步到的pod信息,來更新DesiredStateOfWorld。另外一個方法vm.reconciler.Run,是預(yù)期狀態(tài)和實際狀態(tài)的協(xié)調(diào)者,它負(fù)責(zé)將實際狀態(tài)調(diào)整成與預(yù)期狀態(tài)。預(yù)期狀態(tài)的更新實現(xiàn),以及協(xié)調(diào)者具體如何協(xié)調(diào),需要繼續(xù)閱讀代碼才能理解
追蹤vm.desiredStateOfWorldPopulator.Run,我們發(fā)現(xiàn)這段邏輯
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { for _, pod := range dswp.podManager.GetPods() { if dswp.isPodTerminated(pod) { continue } dswp.processPodVolumes(pod) } }
kubelet會同步新增的pod到desiredStateOfWorldPopulator的podManager中。這段代碼就是輪詢其中非結(jié)束狀態(tài)的pod,并交給desiredStateOfWorldPopulator處理
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { ... for _, podVolume := range pod.Spec.Volumes { volumeSpec, volumeGidValue, err := dswp.createVolumeSpec(podVolume, pod.Namespace) if err != nil { glog.Errorf( "Error processing volume %q for pod %q: %v", podVolume.Name, format.Pod(pod), err) continue } _, err = dswp.desiredStateOfWorld.AddPodToVolume( uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue) if err != nil { glog.Errorf( "Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v", podVolume.Name, volumeSpec.Name(), uniquePodName, err) } glog.V(10).Infof( "Added volume %q (volSpec=%q) for pod %q to desired state.", podVolume.Name, volumeSpec.Name(), uniquePodName) } dswp.markPodProcessed(uniquePodName) }
desiredStateOfWorldPopulator并不處理很重的邏輯,只是作為一個代理,將控制某個pod預(yù)期狀態(tài)的邏輯交付給desiredStateOfWorld,并標(biāo)記為已處理
func (dsw *desiredStateOfWorld) AddPodToVolume( podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (v1.UniqueVolumeName, error) { ... dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{ podName: podName, pod: pod, spec: volumeSpec, outerVolumeSpecName: outerVolumeSpecName, } return volumeName, nil }
這段邏輯中,我們忽略了前面一系列預(yù)處理操作,直接關(guān)注最核心的地方:確定預(yù)期狀態(tài)的方式就是,用一個映射表結(jié)構(gòu),綁定volume到pod之間的關(guān)系,這個關(guān)系表就是綁定關(guān)系的參考依據(jù)
看完了desiredStateOfWorldPopulator的處理邏輯,接著進(jìn)入另一個核心接口reconciler。它才是volume manager中最重要的控制器
追蹤reconciler的Run方法,我們定位到最核心的一段代碼
func (rc *reconciler) reconcile() { //umount for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { ... err := rc.operationExecutor.UnmountVolume( mountedVolume.MountedVolume, rc.actualStateOfWorld) ... } } // attach/mount for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { ... err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) ... } else if !volMounted || cache.IsRemountRequiredError(err) { ... err := rc.operationExecutor.MountVolume( rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld) ... } } //detach/unmount for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { if attachedVolume.GloballyMounted { ... err := rc.operationExecutor.UnmountDevice( attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) ... } else { ... err := rc.operationExecutor.DetachVolume( attachedVolume.AttachedVolume, false,rc.actualStateOfWorld) ... } } } }
我略去了多余的代碼,保留最核心的部分。這段控制邏輯就是一個協(xié)調(diào)器,具體要做的事情就是,根據(jù)實際狀態(tài)與預(yù)期狀態(tài)的差異,做協(xié)調(diào)操作
如果采用自定義的flexvolume插件,上述這些方法會對插件中實現(xiàn)的方法進(jìn)行系統(tǒng)調(diào)用
flex volume提供的lvm插件。如果需要支持mount和unmount操作,可以在這個腳本中補(bǔ)充
#!/bin/bash # Copyright 2015 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Notes: # - Please install "jq" package before using this driver. usage() { err "Invalid usage. Usage: " err "\t$0 init" err "\t$0 attach <json params> <nodename>" err "\t$0 detach <mount device> <nodename>" err "\t$0 waitforattach <mount device> <json params>" err "\t$0 mountdevice <mount dir> <mount device> <json params>" err "\t$0 unmountdevice <mount dir>" err "\t$0 isattached <json params> <nodename>" exit 1 } err() { echo -ne $* 1>&2 } log() { echo -ne $* >&1 } ismounted() { MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1` if [ "${MOUNT}" == "${MNTPATH}" ]; then echo "1" else echo "0" fi } getdevice() { VOLUMEID=$(echo ${JSON_PARAMS} | jq -r '.volumeID') VG=$(echo ${JSON_PARAMS}|jq -r '.volumegroup') # LVM substitutes - with -- VOLUMEID=`echo $VOLUMEID|sed s/-/--/g` VG=`echo $VG|sed s/-/--/g` DMDEV="/dev/mapper/${VG}-${VOLUMEID}" echo ${DMDEV} } attach() { JSON_PARAMS=$1 SIZE=$(echo $1 | jq -r '.size') DMDEV=$(getdevice) if [ ! -b "${DMDEV}" ]; then err "{\"status\": \"Failure\", \"message\": \"Volume ${VOLUMEID} does not exist\"}" exit 1 fi log "{\"status\": \"Success\", \"device\":\"${DMDEV}\"}" exit 0 } detach() { log "{\"status\": \"Success\"}" exit 0 } waitforattach() { shift attach $* } domountdevice() { MNTPATH=$1 DMDEV=$2 FSTYPE=$(echo $3|jq -r '.["kubernetes.io/fsType"]') if [ ! -b "${DMDEV}" ]; then err "{\"status\": \"Failure\", \"message\": \"${DMDEV} does not exist\"}" exit 1 fi if [ $(ismounted) -eq 1 ] ; then log "{\"status\": \"Success\"}" exit 0 fi VOLFSTYPE=`blkid -o udev ${DMDEV} 2>/dev/null|grep "ID_FS_TYPE"|cut -d"=" -f2` if [ "${VOLFSTYPE}" == "" ]; then mkfs -t ${FSTYPE} ${DMDEV} >/dev/null 2>&1 if [ $? -ne 0 ]; then err "{ \"status\": \"Failure\", \"message\": \"Failed to create fs ${FSTYPE} on device ${DMDEV}\"}" exit 1 fi fi mkdir -p ${MNTPATH} &> /dev/null mount ${DMDEV} ${MNTPATH} &> /dev/null if [ $? -ne 0 ]; then err "{ \"status\": \"Failure\", \"message\": \"Failed to mount device ${DMDEV} at ${MNTPATH}\"}" exit 1 fi log "{\"status\": \"Success\"}" exit 0 } unmountdevice() { MNTPATH=$1 if [ ! -d ${MNTPATH} ]; then log "{\"status\": \"Success\"}" exit 0 fi if [ $(ismounted) -eq 0 ] ; then log "{\"status\": \"Success\"}" exit 0 fi umount ${MNTPATH} &> /dev/null if [ $? -ne 0 ]; then err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}" exit 1 fi log "{\"status\": \"Success\"}" exit 0 } isattached() { log "{\"status\": \"Success\", \"attached\":true}" exit 0 } op=$1 if [ "$op" = "init" ]; then log "{\"status\": \"Success\"}" exit 0 fi if [ $# -lt 2 ]; then usage fi shift case "$op" in attach) attach $* ;; detach) detach $* ;; waitforattach) waitforattach $* ;; mountdevice) domountdevice $* ;; unmountdevice) unmountdevice $* ;; isattached) isattached $* ;; *) log "{ \"status\": \"Not supported\" }" exit 0 esac exit 1
值得注意的是,為什么會有兩次mount操作,一次mountdevice,一次mount。分別是做什么的?
其實k8s提供的volume管理方式是,一個volume可以被多個pod掛載,如果某個device需要作為多個pod的volume,就需要多次掛載。但是device只能被掛載一次。所以,k8s采用的方式是,先用mountdevice將device掛載到一個全局目錄,然后這個全局目錄就可以被多次掛載到pod的卷目錄。如此一來,就能完成多pod掛載同一個volume
AttachVolume:調(diào)用attach
DetachVolume:調(diào)用detach
MountVolume:調(diào)用mountdevice,mount
UnmountVolume:調(diào)用unmount
UnmountDevice:調(diào)用umountdevice
volume和pod的預(yù)期狀態(tài)不存在綁定關(guān)系,則detach volume,并對pod和volume執(zhí)行unmount操作
volume和pod的預(yù)期狀態(tài)存在綁定關(guān)系,則attach volume,并對pod和volume執(zhí)行mount操作
DesiredStateOfWorld:預(yù)期中,pod對volume的使用情況,簡稱預(yù)期狀態(tài)。當(dāng)pod.yaml定制好volume,并提交成功,預(yù)期狀態(tài)就已經(jīng)確定
ActualStateOfWorld:實際中,pod對voluem的使用情況,簡稱實際狀態(tài)。實際狀態(tài)是kubelet的后臺線程監(jiān)控的結(jié)果
不斷同步apiserver的pod信息,根據(jù)新增、刪除的pod對volume狀態(tài)進(jìn)行同步更新
啟動服務(wù),監(jiān)聽controller manager的請求。其中controller manager可以輔助kubelet管理volume,用戶也可以選擇禁用controller manager的管理
只有理解了volume manager的代碼,在使用它提供的volume plugin或者實現(xiàn)自定義flex volume plugin時才能駕輕就熟。以上代碼,都是基于k8s v1.6.6版本
以上就是如何理解kubernetes數(shù)據(jù)卷管理的源碼,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
本文名稱:如何理解kubernetes數(shù)據(jù)卷管理的源碼
URL標(biāo)題:http://bm7419.com/article4/jdcooe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗、商城網(wǎng)站、營銷型網(wǎng)站建設(shè)、做網(wǎng)站、網(wǎng)站內(nèi)鏈、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)