kubernetes存储相关源码阅读笔记

Posted by 爱折腾的工程师 on Monday, March 15, 2021
  • kubernetes版本:v1.18.3

kubernetes存储相关源码

在kubernetes源码中与存储相关的代码有:

  • PersistentVolumeBinderController
  • AttachDetachController
  • VolumeExpandController
  • PVCProtectionController
  • PVProtectionController
  • CSI
  • kubelet的VolumeManager,这几部分代码之间到底有什么关系呢,之间又是怎么联合工作的呢?

1. kube-controller-manager中的controller

kube-controller-manager可以代理启动一系列的controller组,基本上每一种k8s内置资源对象都有一个对应的controller实现, 所以k8s内置资源对应的controller入口启动函数在kube-controller-manager代码这边.

1.1 PersistentVolumeBinderController

启动入口函数

func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
    //收集所有pv插件,实际上是实现了VolumePlugin接口的volume插件列表,其中hostPath和nfs类型的volume回收可以在额外配置
    plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
    if err != nil {
        return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
    }
    //初始化persistentvolumecontroller的参数
    params := persistentvolumecontroller.ControllerParameters{
        KubeClient:                ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
        SyncPeriod:                ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
        VolumePlugins:             plugins,
        Cloud:                     ctx.Cloud,
        ClusterName:               ctx.ComponentConfig.KubeCloudShared.ClusterName,
        VolumeInformer:            ctx.InformerFactory.Core().V1().PersistentVolumes(),
        ClaimInformer:             ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
        ClassInformer:             ctx.InformerFactory.Storage().V1().StorageClasses(),
        PodInformer:               ctx.InformerFactory.Core().V1().Pods(),
        NodeInformer:              ctx.InformerFactory.Core().V1().Nodes(),
        EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
    }
    //初始化PersistentVolumeController
    volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
    if volumeControllerErr != nil {
        return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
    } 
    //启动volumeController
    go volumeController.Run(ctx.Stop)
    return nil, true, nil
}

初始化PersistentVolumeController对象

// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
    eventRecorder := p.EventRecorder
    if eventRecorder == nil {
        broadcaster := record.NewBroadcaster()
        broadcaster.StartLogging(klog.Infof)
        broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
        eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
    }

    controller := &PersistentVolumeController{
        //为什么需要本地维护一份volumes、claims缓存,volumes、claims缓存保存的是上一个已知版本,
        //如果没有这个缓存,从informers中获取到的volume.Status、claim就会是旧版本
        volumes:                       newPersistentVolumeOrderedIndex(),
        claims:                        cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
        kubeClient:                    p.KubeClient,
        eventRecorder:                 eventRecorder,
        runningOperations:             goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
        cloud:                         p.Cloud,
        enableDynamicProvisioning:     p.EnableDynamicProvisioning,
        clusterName:                   p.ClusterName,
        createProvisionedPVRetryCount: createProvisionedPVRetryCount,
        createProvisionedPVInterval:   createProvisionedPVInterval,
        claimQueue:                    workqueue.NewNamed("claims"),
        volumeQueue:                   workqueue.NewNamed("volumes"),
        resyncPeriod:                  p.SyncPeriod,
        operationTimestamps:           metrics.NewOperationStartTimeCache(),
    }
    //初始化所有volume插件
    // Prober is nil because PV is not aware of Flexvolume.
    if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {
        return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
    }
    //设置VolumeInformer事件回调函数
    p.VolumeInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
            UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
            DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
        },
    )
    controller.volumeLister = p.VolumeInformer.Lister()
    controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
    //设置ClaimInformer事件回调函数
    p.ClaimInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
            UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
            DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
        },
    )
    controller.claimLister = p.ClaimInformer.Lister()
    controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced

    controller.classLister = p.ClassInformer.Lister()
    controller.classListerSynced = p.ClassInformer.Informer().HasSynced
    controller.podLister = p.PodInformer.Lister()
    controller.podListerSynced = p.PodInformer.Informer().HasSynced
    controller.NodeLister = p.NodeInformer.Lister()
    controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced

    csiTranslator := csitrans.New()
    controller.translator = csiTranslator
    controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)

    return controller, nil
}

启动volumeController

// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
    //设置defer
    defer utilruntime.HandleCrash()
    defer ctrl.claimQueue.ShutDown()
    defer ctrl.volumeQueue.ShutDown()

    klog.Infof("Starting persistent volume controller")
    defer klog.Infof("Shutting down persistent volume controller")

    //informer从etcd同步一份数据当作缓存
    if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
        return
    }
    //更新PersistentVolumes/PersistentVolumeClaims informer中的缓存
    ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
    //进行resync操作,让所有的pv/pvc key重新再进入队列
    go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
    //从PersistentVolumes informer cache中获取循环更新ctrl.volumes.store
    go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
    //从PersistentVolumeClaims informer cache中获取循环更新ctrl.claims
    go wait.Until(ctrl.claimWorker, time.Second, stopCh)
    //往prometheus注册metric
    metrics.Register(ctrl.volumes.store, ctrl.claims)

    <-stopCh
}

volumeWorker逻辑

// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {
    workFunc := func() bool {
        keyObj, quit := ctrl.volumeQueue.Get()
        if quit {
            return true
        }
        //标记key对象已处理
        defer ctrl.volumeQueue.Done(keyObj)
        key := keyObj.(string)
        klog.V(5).Infof("volumeWorker[%s]", key)
        //对key进行分离,分离namespace, name字段
        _, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
            return false
        }
        //从informer cache中获取
        volume, err := ctrl.volumeLister.Get(name)
        if err == nil {
            // The volume still exists in informer cache, the event must have
            // been add/update/sync
            //存在就更新
            ctrl.updateVolume(volume)
            return false
        }
        if !errors.IsNotFound(err) {
            klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
            return false
        }

        // The volume is not in informer cache, the event must have been
        // "delete"
        volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
        if err != nil {
            klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
            return false
        }
        if !found {
            // The controller has already processed the delete event and
            // deleted the volume from its cache
            klog.V(2).Infof("deletion of volume %q was already processed", key)
            return false
        }
        volume, ok := volumeObj.(*v1.PersistentVolume)
        if !ok {
            klog.Errorf("expected volume, got %+v", volumeObj)
            return false
        }
        //如果不存在于informer cache中,就从ctrl.volumes.store删除该volume
        ctrl.deleteVolume(volume)
        return false
    }
    for {
        if quit := workFunc(); quit {
            klog.Infof("volume worker queue shutting down")
            return
        }
    }
}

主处理逻辑

volumeWorker -> updateVolume -> syncVolume
// syncVolume is the main controller method to decide what to do with a volume.
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {
    klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))

    // Set correct "migrated-to" annotations on PV and update in API server if
    // necessary
    //是否添加pv.kubernetes.io/migrated-to annotation,从inTree过渡到csi
    newVolume, err := ctrl.updateVolumeMigrationAnnotations(volume)
    if err != nil {
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
    }
    volume = newVolume

    // [Unit test set 4]
    //pv没和pvc绑定
    if volume.Spec.ClaimRef == nil {
        // Volume is unused
        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
        //更新pv的状态为Available
        if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
            // Nothing was saved; we will fall back into the same
            // condition in the next call to this method
            return err
        }
        return nil
    } else /* pv.Spec.ClaimRef != nil */ {
        // Volume is bound to a claim.
        //pv和pvc绑定,但是pvc没和pv绑定
        if volume.Spec.ClaimRef.UID == "" {
            // The PV is reserved for a PVC; that PVC has not yet been
            // bound to this PV; the PVC sync will handle it.
            klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
            //更新pv的状态为Available
            if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
                // Nothing was saved; we will fall back into the same
                // condition in the next call to this method
                return err
            }
            return nil
        }
        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
        // Get the PVC by _name_
        var claim *v1.PersistentVolumeClaim
        claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
        //从维护的claims缓存中查找pv所绑定的pvc
        obj, found, err := ctrl.claims.GetByKey(claimName)
        if err != nil {
            return err
        }
        //从claims缓存中没找到的话,如果存在pv.kubernetes.io/bound-by-controller annotation
        if !found && metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
            // If PV is bound by external PV binder (e.g. kube-scheduler), it's
            // possible on heavy load that corresponding PVC is not synced to
            // controller local cache yet. So we need to double-check PVC in
            //   1) informer cache
            //   2) apiserver if not found in informer cache
            // to make sure we will not reclaim a PV wrongly.
            // Note that only non-released and non-failed volumes will be
            // updated to Released state when PVC does not exist.
            //先从informer cache中查找pvc,如果没有的话,再从apiserver查找pvc
            if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
                obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
                if err != nil && !apierrors.IsNotFound(err) {
                    return err
                }
                found = !apierrors.IsNotFound(err)
                if !found {
                    obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})
                    if err != nil && !apierrors.IsNotFound(err) {
                        return err
                    }
                    found = !apierrors.IsNotFound(err)
                }
            }
        }
        if !found {
            klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
            // Fall through with claim = nil
        } else {
            var ok bool
            claim, ok = obj.(*v1.PersistentVolumeClaim)
            if !ok {
                return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
            }
            klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
        }
        //异常情况处理:pv指向的pvc被删除了,重制pvc,即claim=nil
        if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
            // The claim that the PV was pointing to was deleted, and another
            // with the same name created.
            klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
            // Treat the volume as bound to a missing claim.
            claim = nil
        }

        if claim == nil {
            // If we get into this block, the claim must have been deleted;
            // NOTE: reclaimVolume may either release the PV back into the pool or
            // recycle it or do nothing (retain)

            // Do not overwrite previous Failed state - let the user see that
            // something went wrong, while we still re-try to reclaim the
            // volume.
            if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
                // Also, log this only once:
                klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
                //重置volume的状态为Released
                if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
                    // Nothing was saved; we will fall back into the same condition
                    // in the next call to this method
                    return err
                }
            }
            //根据volume回收策略进行回收操作
            //这里会调用到一个goroutinemap的包,通过名字来管理goroutines
            if err = ctrl.reclaimVolume(volume); err != nil {
                // Release failed, we will fall back into the same condition
                // in the next call to this method
                return err
            }
            return nil
        } else if claim.Spec.VolumeName == "" {
            //检查pvc和pv的volumeMode,是否是一致的fs或block
            if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
                // Binding for the volume won't be called in syncUnboundClaim,
                // because findBestMatchForClaim won't return the volume due to volumeMode mismatch.
                volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
                ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
                claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
                ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
                // Skipping syncClaim
                return nil
            }
            //如果volume有pv.kubernetes.io/bound-by-controller的annotation
            if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
                // The binding is not completed; let PVC sync handle it
                klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
            } else {
                // Dangling PV; try to re-establish the link in the PVC sync
                klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
            }
            // In both cases, the volume is Bound and the claim is Pending.
            // Next syncClaim will fix it. To speed it up, we enqueue the claim
            // into the controller, which results in syncClaim to be called
            // shortly (and in the right worker goroutine).
            // This speeds up binding of provisioned volumes - provisioner saves
            // only the new PV and it expects that next syncClaim will bind the
            // claim to it.
            ctrl.claimQueue.Add(claimToClaimKey(claim))
            return nil
        } else if claim.Spec.VolumeName == volume.Name {
            // Volume is bound to a claim properly, update status if necessary
            klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
            //更新volume的状态为Bound
            if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
                // Nothing was saved; we will fall back into the same
                // condition in the next call to this method
                return err
            }
            return nil
        } else {
            //排除上面那些情况,接下来就剩下pv已绑定pvc,pvc绑定到另一个pv的情况了
            //如果volume的annotation中有pv.kubernetes.io/provisioned-by且回收策略是delete
            // Volume is bound to a claim, but the claim is bound elsewhere
            if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
                // This volume was dynamically provisioned for this claim. The
                // claim got bound elsewhere, and thus this volume is not
                // needed. Delete it.
                // Mark the volume as Released for external deleters and to let
                // the user know. Don't overwrite existing Failed status!
                if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
                    // Also, log this only once:
                    klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
                    //更新volume的状态为Released
                    if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
                        // Nothing was saved; we will fall back into the same condition
                        // in the next call to this method
                        return err
                    }
                }
                //根据volume回收策略进行回收操作
                if err = ctrl.reclaimVolume(volume); err != nil {
                    // Deletion failed, we will fall back into the same condition
                    // in the next call to this method
                    return err
                }
                return nil
            } else {
                // Volume is bound to a claim, but the claim is bound elsewhere
                // and it's not dynamically provisioned.
                if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
                    // This is part of the normal operation of the controller; the
                    // controller tried to use this volume for a claim but the claim
                    // was fulfilled by another volume. We did this; fix it.
                    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
                    //pv和pvc解绑,更新volume的状态为Available
                    if err = ctrl.unbindVolume(volume); err != nil {
                        return err
                    }
                    return nil
                } else {
                    // The PV must have been created with this ptr; leave it alone.
                    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
                    // This just updates the volume phase and clears
                    // volume.Spec.ClaimRef.UID. It leaves the volume pre-bound
                    // to the claim.
                    //pv和pvc解绑,更新volume的状态为Available
                    if err = ctrl.unbindVolume(volume); err != nil {
                        return err
                    }
                    return nil
                }
            }
        }
    }
}

claimWorker逻辑

// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {
    workFunc := func() bool {
        keyObj, quit := ctrl.claimQueue.Get()
        if quit {
            return true
        }
        defer ctrl.claimQueue.Done(keyObj)
        key := keyObj.(string)
        klog.V(5).Infof("claimWorker[%s]", key)

        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
            return false
        }
        //从informer cache中获取
        claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
        if err == nil {
            // The claim still exists in informer cache, the event must have
            // been add/update/sync
            ctrl.updateClaim(claim)
            return false
        }
        if !errors.IsNotFound(err) {
            klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
            return false
        }

        // The claim is not in informer cache, the event must have been "delete"
        claimObj, found, err := ctrl.claims.GetByKey(key)
        if err != nil {
            klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
            return false
        }
        if !found {
            // The controller has already processed the delete event and
            // deleted the claim from its cache
            klog.V(2).Infof("deletion of claim %q was already processed", key)
            return false
        }
        claim, ok := claimObj.(*v1.PersistentVolumeClaim)
        if !ok {
            klog.Errorf("expected claim, got %+v", claimObj)
            return false
        }
        ctrl.deleteClaim(claim)
        return false
    }
    for {
        if quit := workFunc(); quit {
            klog.Infof("claim worker queue shutting down")
            return
        }
    }
}

主处理逻辑

claimWorker -> updateClaim -> syncClaim
// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
    klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))

    // Set correct "migrated-to" annotations on PVC and update in API server if
    // necessary
    //更新claim的annotation,从inTree过渡到csi
    newClaim, err := ctrl.updateClaimMigrationAnnotations(claim)
    if err != nil {
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
    }
    claim = newClaim

    if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
        //同步未被绑定的claim
        return ctrl.syncUnboundClaim(claim)
    } else {
        //同步已被绑定的claim
        return ctrl.syncBoundClaim(claim)
    }
}

syncUnboundClaim函数

// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
    // This is a new PVC that has not completed binding
    // OBSERVATION: pvc is "Pending"
    //pvc未绑定至pv
    if claim.Spec.VolumeName == "" {
        // User did not care which PV they get.
        //如果pvc指向的volume为空,校验pvc的storageclass是否为WaitForFirstConsumer绑定方式
        //storageclass pvc和pv的绑定方式有:1. 立即绑定 2. 被pod使用到
        delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
        if err != nil {
            return err
        }

        // [Unit test set 1]
        //从pvIndex中寻找跟pvc最匹配的pv,pvIndex根据accessmodes建立的索引
        //1. 获取所有匹配的accessModes
        //2. 根据[]v1.PersistentVolumeAccessMode过滤出所有的volume
        //3. 寻找最一个最匹配的volume(容量最接近的)
        volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
        if err != nil {
            klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
            return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)
        }
        if volume == nil {
            //未找到匹配的pv
            klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
            // No PV could be found
            // OBSERVATION: pvc is "Pending", will retry
            switch {
            //pvc是否是延迟绑定
            case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
                ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")
            case v1helper.GetPersistentVolumeClaimClass(claim) != "":
                //没有找到匹配的pv,继续执行provision volume的过程
                //1.使用external provisioner provision volume
                //2.使用内置的provisioner provision volume
                if err = ctrl.provisionClaim(claim); err != nil {
                    return err
                }
                return nil
            default:
                ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
            }

            // Mark the claim as Pending and try to find a match in the next
            // periodic syncClaim
            //更新claim的状态为Pending
            if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
                return err
            }
            return nil
        } else /* pv != nil */ {
            //找到匹配的pv
            // Found a PV for this claim
            // OBSERVATION: pvc is "Pending", pv is "Available"
            claimKey := claimToClaimKey(claim)
            klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
            //pv与pvc相互绑定
            if err = ctrl.bind(volume, claim); err != nil {
                // On any error saving the volume or the claim, subsequent
                // syncClaim will finish the binding.
                // record count error for provision if exists
                // timestamp entry will remain in cache until a success binding has happened
                metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
                return err
            }
            // OBSERVATION: claim is "Bound", pv is "Bound"
            // if exists a timestamp entry in cache, record end to end provision latency and clean up cache
            // End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric"
            // [Unit test 12-1, 12-2, 12-4]
            metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
            return nil
        }
    } else /* pvc.Spec.VolumeName != nil */ {
        // [Unit test set 2]
        // User asked for a specific PV.
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
        obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
        if err != nil {
            return err
        }
        if !found {
            // User asked for a PV that does not exist.
            // OBSERVATION: pvc is "Pending"
            // Retry later.
            klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
            //从维护的volumes找不到volume对象的话,更新claim的状态为Pending
            if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
                return err
            }
            return nil
        } else {
            //从维护的volumes找到volume对象的话
            volume, ok := obj.(*v1.PersistentVolume)
            if !ok {
                return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
            }
            klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
            //pvc状态"Pending", pv状态"Available"
            if volume.Spec.ClaimRef == nil {
                // User asked for a PV that is not claimed
                // OBSERVATION: pvc is "Pending", pv is "Available"
                klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
                //检测pv是否满足pvc要求
                if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
                    klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
                    // send an event
                    msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
                    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
                    // volume does not satisfy the requirements of the claim
                    //更新claim的状态为Pending
                    if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
                        return err
                    }
                  //pv和pvc相互绑定
                } else if err = ctrl.bind(volume, claim); err != nil {
                    // On any error saving the volume or the claim, subsequent
                    // syncClaim will finish the binding.
                    return err
                }
                // OBSERVATION: pvc is "Bound", pv is "Bound"
                return nil
            } else if pvutil.IsVolumeBoundToClaim(volume, claim) {
                //pvc状态"Pending", pv状态"Bound"
                // User asked for a PV that is claimed by this PVC
                // OBSERVATION: pvc is "Pending", pv is "Bound"
                klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))

                // Finish the volume binding by adding claim UID.
                //pv和pvc相互绑定
                if err = ctrl.bind(volume, claim); err != nil {
                    return err
                }
                // OBSERVATION: pvc is "Bound", pv is "Bound"
                //pvc状态"Bound", pv状态"Bound"
                return nil
            } else {
                //pv指向其他的pvc,pvc状态"Pending", pv状态"Bound"
                // User asked for a PV that is claimed by someone else
                // OBSERVATION: pvc is "Pending", pv is "Bound"
                if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {
                    klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
                    // User asked for a specific PV, retry later
                    if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
                        return err
                    }
                    return nil
                } else {
                    // This should never happen because someone had to remove
                    // AnnBindCompleted annotation on the claim.
                    klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
                    return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
                }
            }
        }
    }
}

syncBoundClaim函数

// syncBoundClaim is the main controller method to decide what to do with a
// bound claim.
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
    // HasAnnotation(pvc, pvutil.AnnBindCompleted)
    // This PVC has previously been bound
    // OBSERVATION: pvc is not "Pending"
    // [Unit test set 3]
    if claim.Spec.VolumeName == "" {
        // Claim was bound before but not any more.
        //pvc之前是Bound状态,现在是Lost状态
        if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
            return err
        }
        return nil
    }
    //从本地volumes缓存中根据pv名字获取对应的对象
    obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
    if err != nil {
        return err
    }
    if !found {
        // Claim is bound to a non-existing volume.
        //未找到匹配的pv对象
        if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
            return err
        }
        return nil
    } else {
        //找到匹配的pv对象
        volume, ok := obj.(*v1.PersistentVolume)
        if !ok {
            return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
        }

        klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
        if volume.Spec.ClaimRef == nil {
            //pvc绑定了pv,但pv未绑定pvc
            // Claim is bound but volume has come unbound.
            // Or, a claim was bound and the controller has not received updated
            // volume yet. We can't distinguish these cases.
            // Bind the volume again and set all states to Bound.
            klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
            //pv和pvc相互绑定
            if err = ctrl.bind(volume, claim); err != nil {
                // Objects not saved, next syncPV or syncClaim will try again
                return err
            }
            return nil
        } else if volume.Spec.ClaimRef.UID == claim.UID {
            //pvc和pv已相互绑定
            // All is well
            // NOTE: syncPV can handle this so it can be left out.
            // NOTE: bind() call here will do nothing in most cases as
            // everything should be already set.
            klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
            if err = ctrl.bind(volume, claim); err != nil {
                // Objects not saved, next syncPV or syncClaim will try again
                return err
            }
            return nil
        } else {
            //pvc是Bound状态,但pv却和其他pvc绑定了
            // Claim is bound but volume has a different claimant.
            // Set the claim phase to 'Lost', which is a terminal
            // phase.
            if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
                return err
            }
            return nil
        }
    }
}

1.2 AttachDetachController

1.3 VolumeExpandController

1.4 PVCProtectionController

1.5 PVProtectionController

「真诚赞赏,手留余香」

爱折腾的工程师

真诚赞赏,手留余香

使用微信扫描二维码完成支付