k8s 从头实现加强版 statefulSet - 上篇

k8s 从头实现加强版 statefulSet - 上篇

tk_sky 34 2025-01-09

k8s 从头实现加强版 statefulSet - 上篇

这是为 mcyouyou dev team 实现 unicore 的系列笔记,其中关于云原生的内容基本上业务无关,通用性很强,提取出来作为笔记。

上篇实现原版 sts 的基本内容,下篇会实现一些高级实用特性。

背景

StatefulSet 是 k8s 的核心资源类型之一,用来维护有状态应用在集群内的部署。主要的功能:按特定的顺序启停 pod;维护的 pod 对应的 pod 名称、ip 等不变;提供固定的 pv/pvc,也就是稳定的专属存储。

虽然现在很多服务都倾向于做成无状态的微服务,但是考虑到稳定性/均衡性/和本地存储的关联性/设计上对可扩展和迁移的支持有限等因素,大多数业务是不希望被随意迁移/驱逐的,更别说像 mcyouyou 这样强状态、高存储瓶颈的 mc 服务。因此实践上大部分服务都基于 statefulSet 来部署。

目标

为什么要重写?

  • 写着玩
  • 支持原地升级等非常有诱惑力的高级功能
  • 更精细的升级控制和灰度逻辑,如发布暂停、最大不可用等
  • 支持自定义的存储分配逻辑、流量屏蔽等

综上,对于 unicore 来说,我们需要编写一种新的 statefulSet CRD 和对应的控制器,承接来自集群粒度控制面 deployer 在 CRD 资源上的修改,通过 reconcile 的方式维护下属 pod 的生命周期,同时执行 unicore 集群的自定义存储分配逻辑(openebs 本地卷)、网络配置逻辑和需要的升级控制逻辑等。

理论

总结起来,sts 把有状态应用抽象成了两种情况:拓扑状态和存储状态。

拓扑状态就是应用的多个实例之间是不对等的,会有启动的先后、不同的网络标识等;

存储状态就是不同的实例要对应不同的存储,并且某 pod 在整个生命周期对应的存储都要是一致的。

所以 sts 控制器主要负责保证顺序关系、网络标识和一致的存储。

statefulset 的控制器要负责什么操作?

  • 创建时:

    根据 spec.replicas,按序号从0开始创建 pod,名称按 setName-oridal 的格式,确保 pod 按顺序启动,Ready 后才启动下一个;

    根据 spec.volumeClaimTemplates 来为每个 pod 创建对应的 pvc,名字为 templateName-setName-ordinal 的格式;

    确保每个 pod 绑定到唯一的 pvc。

  • 运行时:

    确保每个 pod 都处于 running+ready,否则将重新创建;如果有异常或被删除,会重新创建。

  • 扩缩容:

    如果 replicas 变化,会按序号从低到高创建或从高到低删除对应的 pod

  • 更新时:

    如果修改了 sts 的 pod 模板,要按序号逐个更新 pod

  • 删除时:

    同时删除所有由其管理的 pod

关于存储卷

sts 控制器在默认配置下会负责为 pod 创建 pvc,但不会负责 pvc 创建后对应 pv 的创建。pvc 的创建会自动触发动态存储的 pv 分配。

但是,sts控制器默认只负责创建 pvc,而不负责删除 pvc。被删除的 pod 绑定的 pvc 仍然保留,数据不会丢失,这样才满足 sts 的 stateful的定义。每个对应序号的 pod 不管生命周期如何变化,都只会绑定到原有的 pvc。

实现

实现参考了阿里开源的 openKruise 套件代码。其实他们也很大程度上使用到了 statefulset 的原版代码。

kruise/apis/apps/v1beta1/statefulset_types.go at master · openkruise/kruise (github.com)

定义 CRD 和控制器

开始实现。首先需要定义一个叫 App 的 crd,直接在已有的 kubeBuilder 项目上添加 api:

kubebuilder create api --group unicore --version v1 --kind App

kubebuilder 不建议同一个项目内设多个group/version,所以用和 Deployer 同一个的即可。

接下来在 api/v1 目录下面就可以找到 App 类型的相关定义,包括 App 本身、Spec 等等。然后,可以在 internal/controller 目录下找到 App 对应的 controller。

编写 CR Spec 和 Status 结构

首先在api/v1定义一下我们新的自定义资源的 Spec(不变的期望属性)和 Status(动态的状态):

// AppSpec defines the desired state of App
type AppSpec struct {
	// desired number of pods, defaults to 1
	// +optional
	Replicas *int32 `json:"replicas,omitempty"`

	// the template of the App's pod to create
	// +kubebuilder:pruning:PreserveUnknownFields
	// +kubebuilder:validation:Schemaless
	Template v1.PodTemplateSpec `json:"template"`

	// selects the label that App Controller managed, should match Template
	// +kubebuilder:pruning:PreserveUnknownFields
	// +kubebuilder:validation:Schemaless
	Selector *metav1.LabelSelector `json:"selector"`

	// PVC's template, at least one should match Template's volume mnt
	// +optional
	// +kubebuilder:pruning:PreserveUnknownFields
	// +kubebuilder:validation:Schemaless
	VolumeClaimTemplates []v1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

	// updateStrategy indicates the AppUpdateStrategy that will be
	// employed to update Pods in the App when a revision is made to
	// Template. Behave the same as sts.
	UpdateStrategy AppUpdateStrategy `json:"updateStrategy,omitempty"`

	// serviceName that app controller to create to manage the pods
	ServiceName string `json:"serviceName,omitempty"`

	// the maximum of history records of the template that app controller keeps
	RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`

	// same as sts. if parallel strategy is set, pod will be managed ignoring the ordinal. if orderedReady is set,
	// pod will be scale/update considering ordinal and wait for former pod's ready status
	PodManagementPolicy apps.PodManagementPolicyType `json:"podManagementPolicy,omitempty"`
}

我们的 App 资源需要维护一种版本机制,能够存储和回溯不同版本的 Spec 编辑。因此我们引入一个 revision version ,这是 k8s 实现的机制,用于 statefulset 和 daemonset 的本身的更新和回滚,已经有一部分可以直接用的逻辑了。

// AppStatus defines the observed state of App
type AppStatus struct {

	// specify the current version of app's spec, comparing to object meta to check if update needed
	ObservedGeneration int64 `json:"observedGeneration,omitempty"`

	// replicas is the number of Pods created by the App controller.
	Replicas int32 `json:"replicas"`

	// readyReplicas is the number of Pods created by the App controller that have a Ready Condition.
	ReadyReplicas int32 `json:"readyReplicas"`

	// AvailableReplicas is the number of Pods created by the App controller that have been ready for
	//minReadySeconds.
	AvailableReplicas int32 `json:"availableReplicas"`

	// currentReplicas is the number of Pods created by the App controller from the App version
	// indicated by currentRevision.
	CurrentReplicas int32 `json:"currentReplicas"`

	// updatedReplicas is the number of Pods created by the App controller from the App version
	// indicated by updateRevision.
	UpdatedReplicas int32 `json:"updatingReplicas"`

	// updatedReadyReplicas is the number of updated Pods created by the StatefulSet controller that have a Ready Condition.
	UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"`

	// updatedAvailableReplicas is the number of updated Pods created by the StatefulSet controller that have a Ready condition
	//for atleast minReadySeconds.
	UpdatedAvailableReplicas int32 `json:"updatedAvailableReplicas,omitempty"`

	// currentRevision, if not empty, indicates the version of the App used to generate Pods in the
	// sequence [0,currentReplicas).
	CurrentRevision string `json:"currentRevision"`

	// updateRevision, if not empty, indicates the version of the App used to generate Pods in the sequence
	// [replicas-updatedReplicas,replicas)
	UpdateRevision string `json:"updatingRevision,omitempty"`

	// Represents the latest available observations of a statefulset's current state.
	// +optional
	// +patchMergeKey=type
	// +patchStrategy=merge
	Conditions []apps.StatefulSetCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

	// LabelSelector is label selectors for query over pods that should match the replica count used by HPA.
	LabelSelector string `json:"labelSelector,omitempty"`

	// used to ensure a unique revision hash
	CollisionCount *int32 `json:"collisionCount,omitempty"`

	// VolumeClaims represents the status of compatibility between existing PVCs
	// and their respective templates. It tracks whether the PersistentVolumeClaims have been updated
	// to match any changes made to the volumeClaimTemplates, ensuring synchronization
	// between the defined templates and the actual PersistentVolumeClaims in use.
	VolumeClaims []VolumeClaimStatus `json:"volumeClaims,omitempty"`
}

编写 pod controller

下面开始重头戏,在 internal/controller 编辑 App 对应的 controller。

在 app_controller.go 可以看到框架为我们生成的 Reconcile 方法。这个方法会在 CUD 事件时被触发,用来调整你的 CRD 状态与预期一致。翻看controller-runtime 代码中 Start 方法,可以看到实际上是在 controller 启动时会注册对 CR 的 list-watch,然后调用这个 Reconcile 来处理 watch 到的事件。

可以看到在这个方法上面已经有了 RBAC 相关的注释,我们加上一些相关的 RBAC 确保其到时候有操作相关对象的权限:

// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete

然后关注到这个方法对应的 struct:

// AppReconciler reconciles a App object
type AppReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

之后我们就需要以这个结构体为基础来维护所有的 App 资源。

在 controller 下新建一个 app 包,用来存放 App 这个 CR 对应的控制逻辑。我们需要两个工具结构体,一个负责对具体的 pod 做操作,一个负责维护 App CR(我们的新 statefulSet)本身的状态变化(如 revision 等)。

我们首先来写控制 statefulset 下 pod 管理的工具结构:

import (
	"context"
	"fmt"
	unicore "github.com/mcyouyou/unicore/api/v1"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	clientset "k8s.io/client-go/kubernetes"
	listerv1 "k8s.io/client-go/listers/core/v1"
	storagelisterv1 "k8s.io/client-go/listers/storage/v1"
	"k8s.io/client-go/tools/record"
)

type PodController struct {
	client    clientset.Interface
	podLister listerv1.PodLister
	pvcLister listerv1.PersistentVolumeClaimLister
	scLister  storagelisterv1.StorageClassLister
	recorder  record.EventRecorder
}

func NewPodController(client clientset.Interface, podLister listerv1.PodLister, pvcLister listerv1.PersistentVolumeClaimLister,
	scLister storagelisterv1.StorageClassLister, recorder record.EventRecorder) *PodController {
	return &PodController{
		client:    client,
		podLister: podLister,
		pvcLister: pvcLister,
		scLister:  scLister,
		recorder:  recorder,
	}
}

其中,record.EventRecorder 是 client-go 中与 event 有关的库,负责向 cleint-go 汇报 event。除此之外还要使用到 pod、pvc、sc 的 lister 和 k8s clientSet。

然后实现 CreateStatefulPod 相关方法,用来创建 app 对应的 stateful pod。之所以叫 stateful pod,就是因为它相比普通 pod 有自己的专用槽位(sink),其实就是名字和 pvc 是唯一的。所以这里的主要任务就是创建正确名字的 pod 同时维护对应名字的 pvc:

func (c *PodController) CreateStatefulPod(ctx context.Context, app *unicore.App, pod *v1.Pod) error {
	// create pvc before creating pod
	err := c.createPVC(app, pod)
	if err != nil {
		c.recordPodEvent("create", app, pod, err)
		return err
	}
	_, err = c.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
	if errors.IsAlreadyExists(err) {
		// this pod of its name has been created
		return err
	}
	c.recordPodEvent("create", app, pod, err)
	return err
}

// create app-specified pvc for its pod
func (c *PodController) createPVC(app *unicore.App, pod *v1.Pod) error {
	for _, pvcTemplate := range getPVCFromApp(app, pod) {
		pvc, err := c.pvcLister.PersistentVolumeClaims(pvcTemplate.Namespace).Get(pvcTemplate.Name)
		if errors.IsNotFound(err) {
			_, err := c.client.CoreV1().PersistentVolumeClaims(pvcTemplate.Namespace).Create(context.TODO(),
				pvcTemplate, metav1.CreateOptions{})
			if err != nil {
				return fmt.Errorf("create pvc %s err:%v", pvcTemplate.Name, err)
			}
		} else if err != nil {
			return fmt.Errorf("get pvc %s err:%v", pvcTemplate.Name, err)
		} else if pvc.DeletionTimestamp != nil {
			// this pvc is set to be graceful deleted
			return fmt.Errorf("pvc %s is to be deleted", pvcTemplate.Name)
		}
	}
	return nil
}

func (c *PodController) recordPodEvent(verb string, app *unicore.App, pod *v1.Pod, err error) {
	if err == nil {
		reason := fmt.Sprintf("Succ %s", verb)
		msg := fmt.Sprintf("%s pod %s of app %s successful", verb, pod.Name, app.Name)
		c.recorder.Event(app, v1.EventTypeNormal, reason, msg)
	} else {
		reason := fmt.Sprintf("Failed %s", verb)
		msg := fmt.Sprintf("%s pod %s of app %s failed: %v", verb, pod.Name, app.Name, err)
		c.recorder.Event(app, v1.EventTypeWarning, reason, msg)
	}
}

处理 pod 和 pvc 名字的 util:

// get pvc to-create from app.Spec.VolumeClaimTemplates
func getPVCFromApp(app *unicore.App, pod *v1.Pod) map[string]*v1.PersistentVolumeClaim {
	_, ordinal := getPodAppNameAndOrdinal(pod)
	pvcs := make(map[string]*v1.PersistentVolumeClaim, len(app.Spec.VolumeClaimTemplates))
	for _, pvc := range app.Spec.VolumeClaimTemplates {
		// set pvc name as pvc-app-ordinal
		pvc.Name = getPVCOutName(app, &pvc, ordinal)
		pvc.Namespace = app.Namespace
		pvc.Labels = app.Spec.Selector.MatchLabels
		pvcs[pvc.Name] = &pvc
	}
	return pvcs
}

// get pod's app name and the pod's ordinal
func getPodAppNameAndOrdinal(pod *v1.Pod) (string, int) {
	parts := strings.Split(pod.Name, "-")
	if len(parts) != 2 {
		return "", -1
	}
	if i, err := strconv.Atoi(parts[1]); err == nil {
		return parts[0], i
	}
	return "", -1
}

// get pvc's to-create name for given ordinal
func getPVCOutName(app *unicore.App, pvc *v1.PersistentVolumeClaim, ordinal int) string {
	return fmt.Sprintf("%s-%s-%d", app.Name, pvc.Name, ordinal)
}

然后是 update 方法了。但 update 不能直接就去 update 一个 pod,statefulset 也要负责维护 pod 的 pvc 应满足 app 下定义的要求,同时也必须验证这个 pod 的属性是否已设为归属这个 app,如果没有要做对应的更新,包括名字、命名空间、label之类。

除此之外,update 一个资源时 k8s 采用乐观锁的方式来处理冲突,会在实际执行 update 时比较版本号并以冲突 err 返回的形式报错,所以需要在更新时正确判断发生了 conflict error,然后进行重试。所以 client-go 的 util/retry 包提高了 RetryOnConflict 方法,如果你的 func 返回的是 conflict error,就会进行重试。

func (c *PodController) UpdateStatefulPod(ctx context.Context, app *unicore.App, pod *v1.Pod) error {
	triedUpdate := false
	// use this retry func to update to avoid conflict
	err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
		needUpdate := false
		// check if pod belong to app
		if !matchAppAndPod(app, pod) {
			updatePodIdentity(app, pod)
			needUpdate = true
		}
		// create pvc and update pod's volume if pod's volume dont match the app's requirement
		if matchAppPVC(app, pod) {
			updatePodVolume(app, pod)
			needUpdate = true
			if err := c.createPVC(app, pod); err != nil {
				c.recordPodEvent("update", app, pod, err)
				return err
			}
		}

		if needUpdate {
			triedUpdate = true
			_, err := c.client.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{})
			if err != nil {
				// may be conflict, re-get the pod
				newPod, err2 := c.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
				if err2 != nil {
					log.Errorf("get to-update pod %s err: %v", pod.Name, err2)
				} else {
					// use deep copy to avoid affecting cached pod data
					pod = newPod.DeepCopy()
				}
				return err
			}
		}
		return nil
	})
	if triedUpdate {
		c.recordPodEvent("update", app, pod, err)
	}
	return err
}

一些检查pod身份和pvc是否满足要求的工具方法:

// check if the pod is a member of app
func matchAppAndPod(app *unicore.App, pod *v1.Pod) bool {
	appName, ordinal := getPodAppNameAndOrdinal(pod)
	if ordinal < 0 {
		return false
	}
	return app.Name == appName && pod.Name == getPodOutName(app, ordinal) && pod.Namespace == app.Namespace &&
		pod.Labels[LabelPodOwnerApp] == app.Name
}

// set pod's identity to match app
func updatePodIdentity(app *unicore.App, pod *v1.Pod) {
	_, ordinal := getPodAppNameAndOrdinal(pod)
	pod.Name = getPodOutName(app, ordinal)
	pod.Namespace = app.Namespace
	if pod.Labels == nil {
		pod.Labels = make(map[string]string)
	}
	pod.Labels[LabelPodOwnerApp] = app.Name
	pod.Labels[LabelPodOrdinal] = strconv.Itoa(ordinal)
}

// check if the pod's current volumes match the app's requirement
func matchAppPVC(app *unicore.App, pod *v1.Pod) bool {
	_, ordinal := getPodAppNameAndOrdinal(pod)
	if ordinal < 0 {
		return false
	}
	podVolumes := make(map[string]v1.Volume, len(pod.Spec.Volumes))
	for _, vol := range pod.Spec.Volumes {
		podVolumes[vol.Name] = vol
	}
	for _, pvc := range app.Spec.VolumeClaimTemplates {
		volume, ok := podVolumes[pvc.Name]
		if !ok || volume.VolumeSource.PersistentVolumeClaim == nil ||
			volume.VolumeSource.PersistentVolumeClaim.ClaimName != getPVCOutName(app, &pvc, ordinal) {
			return false
		}
	}
	return true
}

// update pod volumes to match the app. pod volume with the same name will be overwritten
func updatePodVolume(app *unicore.App, pod *v1.Pod) {
	toCreatePVC := getPVCFromApp(app, pod)
	newVolumes := make([]v1.Volume, 0, len(toCreatePVC))
	for name, pvc := range toCreatePVC {
		newVolumes = append(newVolumes, v1.Volume{
			Name: name,
			VolumeSource: v1.VolumeSource{
				PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvc.Name, ReadOnly: false},
			},
		})
	}
	for _, v := range pod.Spec.Volumes {
		if _, ok := toCreatePVC[v.Name]; !ok {
			newVolumes = append(newVolumes, v)
		}
	}
	pod.Spec.Volumes = newVolumes
}

至于 delete 部分就很简单了:

func (c *PodController) DeleteStatefulPod(ctx context.Context, app *unicore.App, pod *v1.Pod) error {
	err := c.client.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
	c.recordPodEvent("delete", app, pod, err)
	return err
}

生成客户端

在定义了 CRD 以后,为了能够直接在 client-go 代码里操作 App 资源,还需要利用 k8s 的工具去生成对应 api 定义的客户端,包括 lister、informer 和 clientSet。这样的话后期才可以直接引入生成的代码对自定义资源做控制。

首先要拉取 k8s 的 code generator 工具:go get k8s.io/code-generator,然后写一个脚本,这里手动切换到 code generator 的脚本,再调用它里面的函数:

set -o errexit
set -o nounset
set -o pipefail

SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..

source /root/go/pkg/mod/k8s.io/code-generator@v0.30.1/kube_codegen.sh

THIS_PKG="github.com/mcyouyou/unicore"

kube::codegen::gen_helpers \
    --boilerplate "${SCRIPT_ROOT}/hack/boilerplate.go.txt" \
    "${SCRIPT_ROOT}/api"

kube::codegen::gen_client \
    --with-watch \
    --output-dir "${SCRIPT_ROOT}/pkg/generated" \
    --output-pkg "${THIS_PKG}/pkg/generated" \
    --boilerplate "${SCRIPT_ROOT}/hack/boilerplate.go.txt" \
    "${SCRIPT_ROOT}/api"

这里在函数调用的参数里指定了代码生成的位置,以及对应的 api 的上一层目录。

为了方便,这里额外增加一层 deployer 目录表示这个 api 组的名字,这样才能正确运行代码生成,类似这样:

image-20241127204017296

然后,在 api 目录下增加一个 register.go,里面填上对应的 GroupName:

package api

// GroupName is the group name used in this package
const (
   GroupName = "unicore.mcyou.cn"
)

同时找到对应类型定义的 struct,在上面增加注释 // +genclient

搞定后就可以运行脚本来生成代码了。等待一段时间后,可以看到:

image-20241127205324565

这样就没问题了。

编写 state controller

写完了对 pod 做控制的逻辑,剩下的就是维护 App,也就是我们的 statefulset 自身的状态的逻辑。这里主要涉及到对 statefulset 所管理的 pod 的一个滚动升级的控制。

这里主要涉及一个概念:controllerRevision。这是 k8s statefulset 和 daemonset 控制器内部的一个机制,用来实现滚动升级。这里我们也可以拿来用。其定义如下:

type ControllerRevision struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
	// pod spec template的序列化
	Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`
    // Data字段所代表的template的版本号,这个是递增的字段,如果我们修改了sts的pod template,会产生一个新的controllerrevision,
    // 该字段也会递增1
	Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

revision 是修订的意思,记录了某一时刻的 pod spec template,是状态数据的不可变快照,所以 revision 中 Data 字段是不可修改的,只能修改 Revision 字段。

这里对应到我们之前在 app status 里定义的字段:currentRevisionupdateRevision,前者是当前 app 最近一次已完成的 update 对应的 revision,即如果当前 app 正在滚动升级,就引入 updateRevision 来对应最新的 revision。如果没有在升级,这两个字段就是一致的。

接下来就是 StateController 本体:

type StateController struct {
	podController *PodController
	recorder      record.EventRecorder
	history       history.Interface
	// client for app
	client    versioned.Interface
	appLister lister.AppLister
}

管理 revision 和 status

ControllerRevision 其实就是对 spec 内容的一个序列化,就像 git 的版本一样可以创建或恢复。这样我们只需要在 pod 处同时维护其对应的 revision,就可以判断该 pod 是否满足期待状态了。这里比较方便的是我们可以直接 import "k8s.io/kubernetes/pkg/controller/history" 来使用它。在实际完成对 App 下管理的 pod 的 reconcile(或者说 update to spec) 操作之前,我们需要先管理好 App 自身的 revision,通过对 apiserver 发送 create update delete 等请求来将其维护在 etcd 里。

这里我们搞一个外部更新方法(UpdateApp())来处理 revision 和 status 相关的更新。

首先我们从 app 的 selector 把对应的 controllerRevisions 给 list 出来 sort 一下;由于 controllerRevision 创建的名称是 hash 过的,可能会有冲突,所以要在 status 处自己维护一个 collisionCnt,传递给 history 包的函数以保持 revision 名称唯一。其次,这里要负责把 app.spec.podSpecTemplate 提取出来,生成 json 格式的 patch,用来应用给创建的每一个 pod。然后指定对象、类别,以提取出的 patch 为 data 来 new 一个新的 revision 对象,再根据是否存在等效的 revision,决定不做任何操作,回滚(Update)到已存在的等效revision,或者 Create 一个新的到 apiserver。之后就可以执行具体针对 pod 的操作 c.applyUpdate() 了。操作完成后,需要同步用 Update 方法更新 App 的 status 里的对应字段,遍历 revision 来清理没有 pod 使用的 revision 版本,删除超过长度限制的 revision,就大功告成了。整体代码如下:

func (c *StateController) UpdateApp(ctx context.Context, app *unicore.App, pods []*v1.Pod) error {
	if app == nil {
		return fmt.Errorf("app is nil")
	}
	// do a copy since it may be modified
	app = app.DeepCopy()
	// list and sort revisions from apiserver
	selector, err := metav1.LabelSelectorAsSelector(app.Spec.Selector)
	if err != nil {
		return err
	}
	revisions, err := c.history.ListControllerRevisions(app, selector)
	if err != nil {
		return err
	}
	history.SortControllerRevisions(revisions)
	collisionCount := int32(0)
	if app.Status.CollisionCount != nil {
		collisionCount = *app.Status.CollisionCount
	}

	// create new revision from to-update app(marshaling)
	patch, err := getAppPatch(app)
	if err != nil {
		return err
	}
	nextRevisionId := int64(1)
	if len(revisions) > 0 {
		nextRevisionId = revisions[len(revisions)-1].Revision + 1
	}
	cr, err := history.NewControllerRevision(app, controllerKind, app.Spec.Template.Labels,
		runtime.RawExtension{Raw: patch}, nextRevisionId, &collisionCount)
	if err != nil {
		return err
	}
	if cr.ObjectMeta.Annotations == nil {
		cr.ObjectMeta.Annotations = make(map[string]string)
	}
	for k, v := range app.Annotations {
		cr.ObjectMeta.Annotations[k] = v
	}

	// check if equivalent revision exists, then decide whether to issue creating or updating(rolling back) to apiserver
	equalRevisions := history.FindEqualRevisions(revisions, cr)
	equalCnt := len(equalRevisions)
	if equalCnt != 0 {
		if history.EqualRevision(revisions[len(revisions)-1], equalRevisions[equalCnt-1]) {
			// newest revision is up to date, nothing changed
			cr = revisions[len(revisions)-1]
		} else {
			// same revision found in history, rollback to it by updating the old revision's RevisionID
			cr, err = c.history.UpdateControllerRevision(equalRevisions[equalCnt-1], cr.Revision)
			if err != nil {
				return err
			}
		}
	} else {
		// issue a creat req to apiserver, adding the collisionCount if hash collision
		cr, err = c.history.CreateControllerRevision(app, cr, &collisionCount)
		if err != nil {
			return err
		}
	}

	// try finding the corresponding revision to the app.status
	updateRevision := cr
	var currentRevision *apps.ControllerRevision
	for i := range revisions {
		if revisions[i].Name == app.Status.CurrentRevision {
			currentRevision = revisions[i]
			break
		}
	}
	if currentRevision == nil {
		currentRevision = updateRevision
	}

	currentStatus, getStatusErr := c.applyUpdate(ctx, app, currentRevision, updateRevision, collisionCount, pods, revisions)
	if getStatusErr != nil && currentStatus == nil {
		return getStatusErr
	}

	updateStatusErr := c.updateAppStatus(ctx, app, currentStatus)
	if updateStatusErr == nil {
		klog.V(4).InfoS("update app status success", "app", klog.KObj(app),
			"replicas", currentStatus.Replicas,
			"readyReplicas", currentStatus.ReadyReplicas,
			"currentReplicas", currentStatus.CurrentReplicas,
			"updatedReplicas", currentStatus.UpdatedReplicas)
	}

	err = nil
	if getStatusErr != nil && updateStatusErr != nil {
		klog.ErrorS(updateStatusErr, "can not update status", "app", klog.KObj(app))
		err = getStatusErr
	} else if getStatusErr != nil {
		err = getStatusErr
	} else if updateStatusErr != nil {
		err = updateStatusErr
	} else {
		klog.V(4).InfoS("update status success", "app", klog.KObj(app),
			"currentRevision", currentStatus.CurrentRevision,
			"updateRevision", currentStatus.UpdateRevision)
	}
	truncateErr := c.truncateHistory(app, pods, revisions, currentRevision, updateRevision)
	if err != nil {
		if truncateErr != nil {
			klog.ErrorS(truncateErr, "can not truncate", "app", klog.KObj(app), "err", truncateErr.Error())
		}
		return err
	}
	return truncateErr
}

核心 Update 逻辑

整个 app 或者 sts 控制器最核心的逻辑就是接下来的 c.applyUpdate() 方法,这里会拆分开来梳理。

首先,做一些准备工作。传入之前找出的 currentRevision 和 updateRevision,将其与当前 app merge 并 decode 得到对应的 currentApp 和 updateApp,更新 app.Status,将传入的 pods 按序号合法性分别放入 replicas 或 invalidPods list。其中 replicas 就是过滤后用来满足 app spec 的 pod list,而 invalidPods list 则是要删除的 pod 列表。但要删除不意味着可以直接删掉它,后面也会根据 stateful 要求做一些控制。

// core procedure of updating the status and pods
func (c *StateController) applyUpdate(ctx context.Context,
	app *unicore.App,
	currentRevision *apps.ControllerRevision,
	updateRevision *apps.ControllerRevision,
	collisionCount int32,
	pods []*v1.Pod,
	revisions []*apps.ControllerRevision) (*unicore.AppStatus, error) {
	selector, err := metav1.LabelSelectorAsSelector(app.Spec.Selector)
	if err != nil {
		return app.Status.DeepCopy(), err
	}

	currentApp, err := ApplyRevision(app, currentRevision)
	if err != nil {
		return app.Status.DeepCopy(), err
	}
	updateApp, err := ApplyRevision(app, updateRevision)
	if err != nil {
		return app.Status.DeepCopy(), err
	}

	status := unicore.AppStatus{}
	status.CurrentRevision = currentRevision.Name
	status.UpdateRevision = updateRevision.Name
	status.ObservedGeneration = app.Generation
	status.CollisionCount = ptr.To[int32](collisionCount)
	status.LabelSelector = selector.String()
	minReadySeconds := getMinReadySeconds(app)
	// update App.Status
	updateStatus(&status, minReadySeconds, currentRevision, updateRevision, pods)

	// the new replica list to maintain
	replicaCnt := int(*app.Spec.Replicas)
	replicas := make([]*v1.Pod, replicaCnt)
	firstUnhealthyOrdinal := math.MaxInt32
	var firstUnhealthyPod *v1.Pod
	invalidPods := make([]*v1.Pod, 0)

	burstable := app.Spec.PodManagementPolicy == apps.ParallelPodManagement
	// filter invalid ordinal pod, fill the pod to replica list with their index
	for i := range pods {
		if _, ord := GetPodAppNameAndOrdinal(pods[i]); ord < replicaCnt {
			replicas[ord] = pods[i]
		} else if ord >= 0 {
			invalidPods = append(invalidPods, pods[i])
		}
	} 
...

接下来找到 replicas 里空的槽位,可能是还未扩容或因为不符合期望状态被删掉的 pod,为其根据扩容策略和当前的 partition 来确定应该创建 updateRevision 的 pod,还是 currentRevision 的 pod。然后根据 ordinal 和判断得到的对应 App 来创建对应的 pod,利用 controller.GetPodFromTemplate 提取出 template,再处理其对应的 pvc 等。

// create extra pod to fill the empty slot of replica list
	// this is where the newer version pod is created
	for i := 0; i < replicaCnt; i++ {
		if replicas[i] == nil {
			replicas[i] = newVersionedPodForApp(currentApp, updateApp, currentRevision.Name, updateRevision.Name,
				i, replicas)
		}
	}
...

// decide and create a pod for an app either from currentRevision or updateRevision
func newVersionedPodForApp(currentApp, updateApp *unicore.App, currentRevision, updateRevision string, ordinal int,
	replicas []*v1.Pod) *v1.Pod {
	if isCurrentRevisionExpected(currentApp, updateRevision, ordinal, replicas) {
		pod := newAppPod(currentApp, ordinal)
		if pod.Labels == nil {
			pod.Labels = make(map[string]string)
		}
		pod.Labels[apps.StatefulSetRevisionLabel] = currentRevision
		return pod
	}
	pod := newAppPod(updateApp, ordinal)
	if pod.Labels == nil {
		pod.Labels = make(map[string]string)
	}
	pod.Labels[apps.StatefulSetRevisionLabel] = updateRevision
	return pod
}

// check if the given ordinal pod is expected to be at currentRevision instead of updateRevision
func isCurrentRevisionExpected(app *unicore.App, updateRevision string, ordinal int, replicas []*v1.Pod) bool {
	// use no rolling update and revisions, from spec data to create pods instead
	if app.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType {
		return false
	}
	if app.Spec.UpdateStrategy.RollingUpdate == nil {
		{
			return ordinal < int(app.Status.CurrentReplicas)
		}
	}
	// use ordered update, pods ordinals in [:RollingUpdate.Partition) are expected to be at CurrentRevision
	if !app.Spec.UpdateStrategy.RollingUpdate.UnorderedUpdate {
		return ordinal < int(*app.Spec.UpdateStrategy.RollingUpdate.Partition)
	}
	// if unordered strategy is set, Partition is the expected amount of pods at CurrentRevision
	var currentRevisionCnt int
	for i, pod := range replicas {
		if pod == nil || i == ordinal {
			continue
		}
		if pod.GetLabels()[apps.ControllerRevisionHashLabelKey] != updateRevision {
			currentRevisionCnt++
		}
	}
	return currentRevisionCnt < int(*app.Spec.UpdateStrategy.RollingUpdate.Partition)
}

// creat pod for app from its template
func newAppPod(app *unicore.App, ordinal int) *v1.Pod {
	pod, _ := controller.GetPodFromTemplate(&app.Spec.Template, app, metav1.NewControllerRef(app, controllerKind))
	pod.Name = getPodOutName(app, ordinal)
	updatePodIdentity(app, pod)
	pod.Spec.Hostname = pod.Name
	pod.Spec.Subdomain = app.Spec.ServiceName
	updateVolume(app, pod)
	return pod
}

然后我们按序号的递减顺序 sort 一下 invalidPods,找到两个 list 里序号最小的 unhealthy 的 pod。这是为了之后在清理 invalidPods 的时候,如果不是并行模式,必须先等待之前的 pod 都 ready 了再执行删除,这是 sts 对于 stateful 的要求。

unhealthy := 0
	for i := range replicas {
		if replicas[i] == nil {
			continue
		}
		if !getPodReady(replicas[i]) || replicas[i].DeletionTimestamp != nil {
			unhealthy++
			if _, ord := GetPodAppNameAndOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
				firstUnhealthyOrdinal = ord
				firstUnhealthyPod = replicas[i]
			}
		}
	}

	for i := len(invalidPods) - 1; i >= 0; i-- {
		if !getPodReady(invalidPods[i]) || invalidPods[i].DeletionTimestamp != nil {
			unhealthy++
			if _, ord := GetPodAppNameAndOrdinal(invalidPods[i]); ord < firstUnhealthyOrdinal {
				firstUnhealthyOrdinal = ord
				firstUnhealthyPod = invalidPods[i]
			}
		}
	}

接下来就是很长的处理 replicas 这个 list 的逻辑了,主要目的是排除失败 pod、等待该等待的 pod、创建未创建的 pod 等,总之是为了使 replica list 符合 app.Spec 的预期(注意,这里的预期只包括这个 pod 在不在,而不在乎它对应的 revision。这个由之后的更新部分来控制)。由于 stateful 要求,我们要先使得 spec 下的 pod 均满足预期且处于 ready 状态,然后我们才能删除 invalid list,所以这里记录一个 allAvailable 值。另外需要注意根据是并行模式还是非并行模式,选择在遇到预期外的 pod 时应当直接退出等待下一次 reconcile 来使 pod 创建有序,还是直接 continue 连续创建多个 pod。

// now we check every pod that should be valid, delete it if failed, create it if not created, and wait until
	// 	all pods are there and match their identities.
	// note that we use pods.Update() to match pod's identity with the app, only editing its labels and annotations,
	// 	not its revision.
	allAvailable := true
	logger := klog.FromContext(ctx)
	var runErr error
	for i := range replicas {
		if replicas[i] == nil {
			continue
		}
		// pods in these two phase should be restarted
		if replicas[i].Status.Phase == v1.PodFailed || replicas[i].Status.Phase == v1.PodSucceeded {
			allAvailable = false
			if replicas[i].DeletionTimestamp == nil {
				if err := c.podController.DeleteStatefulPod(ctx, app, replicas[i]); err != nil {
					c.recorder.Eventf(app, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", replicas[i].Name, err)
					runErr = err
				}
			}
			break
		}

		// if pod not created, create one
		if replicas[i].Status.Phase == "" {
			allAvailable = false
			if err := c.podController.CreateStatefulPod(ctx, app, replicas[i]); err != nil {
				condition := apps.StatefulSetCondition{
					Type:    unicore.ConditionFailCreatePod,
					Status:  v1.ConditionTrue,
					Message: fmt.Sprintf("failed to create pod %s: %v", replicas[i].Name, err),
				}
				setAppCondition(&status, condition)
				runErr = err
			}
			// if burstable not allowed, break to make a monotonic creation
			if !burstable {
				break
			} else {
				continue
			}
		}

		if replicas[i].Status.Phase == v1.PodPending {
			allAvailable = false
			logger.V(4).Info("App is creating pvc for pending pod", "app", klog.KObj(app), klog.KObj(replicas[i]))
			if err := c.podController.createPVC(app, replicas[i]); err != nil {
				runErr = err
				break
			}
		}

		// no bursting: for terminating pod: wait until graceful exit
		if replicas[i].DeletionTimestamp != nil && !burstable {
			logger.V(4).Info("App is waiting for pod to terminate", "app", klog.KObj(app), "pod", klog.KObj(replicas[i]))
			allAvailable = false
			break
		}
		if replicas[i].DeletionTimestamp != nil && burstable {
			logger.V(4).Info("App pod is terminating, skip this loop", "app", klog.KObj(app), "pod", klog.KObj(replicas[i]))
			break
		}

		isAvailable, checkInterval := getPodAvailableAndNextCheckInterval(replicas[i], minReadySeconds)
		if !burstable {
			// not burstable and a pod is unavailable, preceding procedures can't be done
			if !isAvailable {
				allAvailable = false
				if checkInterval > 0 {
					// check it next reconcile
					requeue_duration.Push(GetAppKey(app), checkInterval)
					logger.V(4).Info("waiting for pod to be available after ready for minReadySeconds",
						"app", klog.KObj(app), "waitTime", checkInterval, "pod", klog.KObj(replicas[i]),
						"minReadySeconds", minReadySeconds)
				} else {
					logger.V(4).Info("waiting for pod to be available", "app", klog.KObj(app), "pod", klog.KObj(replicas[i]))
				}
				break
			}
		} else if !isAvailable {
			// burstable but unavailable
			logger.V(4).Info("app pod is unavailable, skip this loop", "app", klog.KObj(app), "pod", klog.KObj(replicas[i]))
			if checkInterval > 0 {
				// check it next reconcile
				requeue_duration.Push(GetAppKey(app), checkInterval)
			}
			break
		}

		// if pod identity mismatch, update its identity
		if !matchAppAndPod(app, replicas[i]) || !matchAppPVC(app, replicas[i]) {
			// avoid changing shared cache
			replica := replicas[i].DeepCopy()
			// update pod's identity to match the app
			if err := c.podController.UpdateStatefulPod(ctx, app, replica); err != nil {
				condition := apps.StatefulSetCondition{
					Type:    unicore.ConditionFailUpdatePod,
					Status:  v1.ConditionTrue,
					Message: fmt.Sprintf("failed to update pod %s: %v", replicas[i].Name, err),
				}
				setAppCondition(&status, condition)
				runErr = err
				break
			}
		}
	}

接下来根据 allAvailable 的结果,如果所有 replica list 的 pod 都 ready 了,就可以处理 invalidPods 了。如果非并发模式,需要控制每次只能删除一个,且删除时其前序 pod 都 ready。

	if runErr != nil || !allAvailable {
		updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, invalidPods)
		return &status, runErr
	}

	// at this point, if not burstable, all pods of spec.Replicas are available.
	// then we can make deletion of invalid pods in a decreasing order, if all predecessors are ready
	shouldExit := false
	for i := range invalidPods {
		pod := invalidPods[i]
		if pod.DeletionTimestamp != nil {
			if !burstable {
				// if not burstable, exit and wait until pod terminated
				shouldExit = true
				logger.V(4).Info("waiting for pod to terminate before scaling down", "app",
					klog.KObj(app), "pod", klog.KObj(pod))
				break
			}
		}
		// not burstable: if it's not the first unhealthy pod, exit until predecessors are ready
		if !burstable && !getPodReady(pod) && pod != firstUnhealthyPod {
			logger.V(4).Info("waiting for preceding pod to be ready before scaling down", "app",
				klog.KObj(app), "pod", klog.KObj(pod), "preceding-pod", klog.KObj(firstUnhealthyPod))
			shouldExit = true
			break
		}

		klog.Info("pod is terminating for scale down app ", klog.KObj(app), " 's pod ", klog.KObj(pod))
		runErr = c.podController.DeleteStatefulPod(ctx, app, invalidPods[i])
		if runErr != nil {
			c.recorder.Eventf(app, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", invalidPods[i].Name, runErr)
			shouldExit = true
			break
		}

		if !burstable {
			shouldExit = true
			break
		}
	}

至此为止,在数量上实际 pods 已经和 app.Spec 要求的对应了。下面就更新一下 app.Status,执行 pod 更新流程。当然,如果 app 更新策略采用的是 onDelete,那么需要等待用户手动删除对应的 pod 来进行更新,这里就可以直接 return 了。

	updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, invalidPods)
	if shouldExit || runErr != nil {
		return &status, runErr
	}

	// for onDelete strategy, pods will only be updated when manually deleted
	if app.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
		return &status, nil
	}

	if app.DeletionTimestamp != nil {
		// app is being deleted
		return &status, nil
	}

最后就是 rollingUpdate 滚动更新策略的内容,我们每次找到序号最大的 revision 与 updateRevision 不同的 pod,将其删除,等待下次 reconcile 时在前述流程创建。如果 app 用默认方式创建没有 rollingUpdate 字段,则手动为其补上。如果 rollingUpdate.Partition 指定了最低更新 partition,则只更新序号在 [partition:] 的 pod;如果序号更高的 pod 还未 ready,则需要 return 等待其 ready 后再继续下一个。

// for rollingUpdate strategy, we terminate the pod with the largest ordinal that does not match the updateRevision
	updateMin := 0

	if app.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType && app.Spec.UpdateStrategy.RollingUpdate == nil {
		zero := int32(0)
		app.Spec.UpdateStrategy.RollingUpdate = &unicore.RollingUpdateAppStrategy{
			Partition:             &zero,
			MaxUnavailable:        &intstr.IntOrString{Type: intstr.Int, IntVal: 1},
			PodUpdatePolicy:       unicore.RecreatePodUpdateStrategyType,
			Paused:                false,
			UnorderedUpdate:       false,
			InPlaceUpdateStrategy: nil,
			MinReadySeconds:       &zero,
		}
		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
			app, err = c.client.UnicoreV1().Apps(app.Namespace).Update(ctx, app, metav1.UpdateOptions{})
			if err != nil {
				return err
			}
			klog.Infof("added default rollingUpdate strategy to app %v", app.Name)
			return nil
		})
		if err != nil {
			klog.Infof("failed adding default rollingUpdate strategy to app %v: %v", app.Name, err)
		}
		return &status, nil
	}

	// update can only be done for pod [partition:]
	updateMin = int(*app.Spec.UpdateStrategy.RollingUpdate.Partition)
	for target := len(replicas) - 1; target >= updateMin; target-- {
		if getPodRevision(replicas[target]) != updateRevision.Name && replicas[target].DeletionTimestamp == nil {
			logger.V(4).Info("terminating pod for update", "app", klog.KObj(app), "pod", klog.KObj(replicas[target]))
			if err := c.podController.DeleteStatefulPod(ctx, app, replicas[target]); err != nil {
				if !errors.IsNotFound(err) {
					return &status, err
				}
			}
			status.CurrentReplicas--
			return &status, err
		}

		// wait for unhealthy pods to update
		if !(getPodReady(replicas[target]) || replicas[target].DeletionTimestamp != nil) {
			logger.V(4).Info("waiting for pod to update", "app", klog.KObj(app), "pod", klog.KObj(replicas[target]))
			return &status, nil
		}
	}

	return &status, nil

至此我们就实现了 stateController。虽然这里只实现了 update 方法,但其实按照 使满足预期状态的思想,已经能够满足对 app 的 CRUD 四种操作了。

Reconcile 逻辑

接下来就可以回到一开始 AppController 这里自动生成的 reconcile 函数了。因为我们在注册控制器时就指定了要 watch App 和所有的 pod 的变动,所以每次涉及其更新时 controller manager 就会自动调用 reconcile 函数,并通过入参 req 传递对应 app 的 namespace 和 name。因此这里首先检查传入 app 是否存在,如果存在则拉取其管理的 pod 状态,调用刚刚实现的 UpdateApp 方法,对照其与 app spec 规定的是否对应并做出修改。

注意到原版的 sts 在删除时不会考虑到 pod 的删除顺序,在 deleteTimestamp != nil 时直接 return,交由 apiserver 的垃圾回收机制直接同时删除所有 pod。所以我们做一个改动,使我们的 app 在被删除时也能保持序号顺序非并行删除,也就是缩容到0后再在 etcd 删除 sts 对象。这样的好处是避免不按顺序退出导致 pvc 中的持久化产物受损。

要做到这一点,需要利用到 k8s 资源的 finalizer 特性。finalizer 本质就是一个 []string,作为一种延迟删除的手段使用,如果其不为空,那在资源被删除时只会修改其 deletionTimestamp,而不会将其删除,直到其 finalizer 为空。因此我们可以在 Reconcile 逻辑中为 cr Update 上一个我们自己的 finalizer,然后在所有 pod 都被删除后将其移除,使得 app 可以被正常删除:

func (r *AppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, retErr error) {
	_ = log.FromContext(ctx)

	app, err := r.Lister.Apps(req.Namespace).Get(req.Name)
	if errors.IsNotFound(err) {
		klog.InfoS("app has been deleted", "app", req, "timeCost", time.Since(startTime))
		return reconcile.Result{}, nil
	}
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("get app err:%v", err))
		return reconcile.Result{}, err
	}

	// add finalizer so that pods won't be deleted by recycling, but by the reconcile func
	// finalizer can only be added when the app is not being deleted
	if app.ObjectMeta.DeletionTimestamp == nil {
		hasFinalizer := false
		for _, v := range app.ObjectMeta.Finalizers {
			if v == appFinalizerName {
				hasFinalizer = true
				break
			}
		}
		if !hasFinalizer {
			app.ObjectMeta.Finalizers = append(app.ObjectMeta.Finalizers, appFinalizerName)
			err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
				app, err = r.UnicoreCli.UnicoreV1().Apps(app.Namespace).Update(ctx, app, metav1.UpdateOptions{})
				if err != nil {
					return err
				}
				klog.Infof("added finalizer for app %v", app.Name)
				return nil
			})
			return reconcile.Result{}, err
		}
	}

	selector, err := metav1.LabelSelectorAsSelector(app.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("parse selector err:%v", err))
		// no need to retry this
		return reconcile.Result{}, nil
	}

	if err := r.adoptOrphanRevisions(app); err != nil {
		return reconcile.Result{}, err
	}

	pods, err := r.getAppPods(ctx, app, selector)
	if err != nil {
		return reconcile.Result{}, err
	}

	if app.ObjectMeta.DeletionTimestamp != nil {
		// if app is being deleted and finalizer seen, remove the finalizer after gracefully deleting the pods
		if app.Spec.Replicas == nil || *app.Spec.Replicas != 0 {
			// to delete the app, we scale it down to zero
			zero := int32(0)
			app.Spec.Replicas = &zero
			err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
				app, err = r.UnicoreCli.UnicoreV1().Apps(app.Namespace).Update(ctx, app, metav1.UpdateOptions{})
				if err != nil {
					return err
				}
				klog.Infof("set replica to 0 for app %v", app.Name)
				return nil
			})
			return reconcile.Result{}, err
		}

		defer func() {
			if retErr == nil {
				if app.Name == "" {
					return
				}

				newFinalizers := make([]string, 0)
				removed := false
				for _, v := range app.ObjectMeta.Finalizers {
					if v != appFinalizerName {
						newFinalizers = append(newFinalizers, v)
					} else {
						removed = true
					}
				}

				if !removed || len(pods) > 0 {
					klog.Infof("waiting for app's %d pod to terminate: %v", len(pods), app.Name)
					return
				}
				// we can now remove the finalizer to complete deletion after all pods are gone
				app.ObjectMeta.Finalizers = newFinalizers
				err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
					app, err = r.UnicoreCli.UnicoreV1().Apps(app.Namespace).Update(ctx, app, metav1.UpdateOptions{})
					if err != nil {
						return err
					}
					klog.Infof("removed finalizer for app %v", app.Name)
					return nil
				})
				if err != nil {
					klog.ErrorS(err, "failed to remove finalizer for app", "app", app)
				}
			}
		}()
	}

	if err := r.StateController.UpdateApp(ctx, app, pods); err != nil {
		return reconcile.Result{RequeueAfter: requeue_duration.Pop(unicoreApp.GetAppKey(app))}, err
	}
	klog.V(4).InfoS("sync app succeeded", "app", req, "timeCost", time.Since(startTime))
	return ctrl.Result{}, nil
}

至此 sts 整个生命周期都已被正确维护,且在删除时能够一直 hold 直到所有 pod 正确终止。

测试运行

由于使用 kube-builder 构建,所以用自动生成的 makefile 就可以为集群部署 CRD、直接在本地启动 controller,还可以直接打包和 push 镜像。

首先使用 make install,把我们定义的 CRD 和相关的一些资源安装到集群上:

[root@McYouYouDevServer deployer]# make install
/root/tk/unicore/deployer/bin/controller-gen-v0.15.0 rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/root/tk/unicore/deployer/bin/kustomize-v5.4.1 build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/apps.unicore.mcyou.cn created
customresourcedefinition.apiextensions.k8s.io/deployers.unicore.mcyou.cn created

之后就可以在 api-versions 里看到自己定义的 version 了:

[root@McYouYouDevServer ~]# kubectl api-versions | grep mc
unicore.mcyou.cn/v1

controller 本身是一套完整的微服务,包含启动用的 main.go 等,所以是可以让它直接在本地的代码跑起来从而进行调试的,不需要打包镜像到环境里面再测试。

使用 make run,就可以直接编译运行:

[root@McYouYouDevServer deployer]# make run
/root/tk/unicore/deployer/bin/controller-gen-v0.15.0 rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/root/tk/unicore/deployer/bin/controller-gen-v0.15.0 object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go run ./cmd/main.go
2024-11-27T19:49:51+08:00       INFO    setup   starting manager
2024-11-27T19:49:51+08:00       INFO    starting server {"name": "health probe", "addr": "[::]:8081"}
2024-11-27T19:49:51+08:00       INFO    Starting EventSource    {"controller": "deployer", "controllerGroup": "unicore.mcyou.cn", "controllerKind": "Deployer", "source": "kind source: *v1.Deployer"}
2024-11-27T19:49:51+08:00       INFO    Starting EventSource    {"controller": "app", "controllerGroup": "unicore.mcyou.cn", "controllerKind": "App", "source": "kind source: *v1.App"}
2024-11-27T19:49:51+08:00       INFO    Starting Controller     {"controller": "app", "controllerGroup": "unicore.mcyou.cn", "controllerKind": "App"}
2024-11-27T19:49:51+08:00       INFO    Starting Controller     {"controller": "deployer", "controllerGroup": "unicore.mcyou.cn", "controllerKind": "Deployer"}
2024-11-27T19:49:51+08:00       INFO    Starting workers        {"controller": "app", "controllerGroup": "unicore.mcyou.cn", "controllerKind": "App", "worker count": 1}
2024-11-27T19:49:51+08:00       INFO    Starting workers        {"controller": "deployer", "controllerGroup": "unicore.mcyou.cn", "controllerKind": "Deployer", "worker count": 1}

这样就跑起来了。

下面我们可以用 yaml 和 kubectl 自己新建一个 App 试一下。在 /config/samples 下面 kubebuilder 已经生成了一个例子,但是可以扩展一下:

apiVersion: unicore.mcyou.cn/v1
kind: App
metadata:
  labels:
    app.kubernetes.io/name: deployer
    app.kubernetes.io/managed-by: kustomize
    app: nginx
  name: app-sample
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
        - name: nginx
          image: nginx:latest
          ports:
            - containerPort: 80
          volumeMounts:
            - name: nginx-data
              mountPath: /usr/share/nginx/html
  volumeClaimTemplates:
    - metadata:
        name: nginx-data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 1Gi

然后用 kubectl 去 apply:

[root@McYouYouDevServer deployer]# kubectl apply -f config/samples/unicore_v1_app.yaml 
app.unicore.mcyou.cn/app-sample created
[root@McYouYouDevServer deployer]# kubectl get app
NAME         AGE
app-sample   3s

然后就可以在 pod 列表里看到对应的 pod 了:

NAME                     READY   STATUS    RESTARTS   AGE
app-sample-0             1/1     Running   0          6s
app-sample-1             1/1     Running   0          5s
app-sample-2             1/1     Running   0          4s

根据时间可以看到顺序启动正常运行。

接下来用 kubectl edit app app-sample 来修改这个 app 的 spec,将其镜像修改成 tomcat,观察其表现:

NAME                     READY   STATUS              RESTARTS   AGE
app-sample-0             0/1     ContainerCreating   0          1s
app-sample-1             1/1     Running             0          3s
app-sample-2             1/1     Running             0          6s

可以看到 pod 从高序号开始终止、重新创建,且查看单个 pod 发现镜像已经改变,符合预期。

然后继续用 kubectl edit 修改,这次修改 replicas 副本数量,先修改为1测试缩容:

NAME                     READY   STATUS    RESTARTS   AGE
app-sample-0             1/1     Running   0          4m2

可以看到剩下两个 pod 都从高到低按顺序退出,只剩下当前这一个。再测试扩容:

NAME                     READY   STATUS    RESTARTS   AGE
app-sample-0             1/1     Running   0          4m47s
app-sample-1             1/1     Running   0          5s
app-sample-2             1/1     Running   0          3s

按顺序扩容,符合预期。也可以额外测试同时修改副本数和镜像,会发现是先执行扩缩容逻辑,然后再执行更新镜像逻辑,符合预期。

问题记录

“no kind is registered for the type v1.App in scheme xxx"

这是因为 client-go 在使用时会默认维护一个自己的全局 scheme,而不是直接使用 mgr 的 scheme。code-generator 默认不会为 clientgo 的 scheme 添加自定义的 CRD 注册,所以这里需要手动增加为 client-go 的全局 scheme 注册上。

func init() {
    utilruntime.Must(clientgoscheme.AddToScheme(scheme))

    utilruntime.Must(unicorev1.AddToScheme(scheme))
    utilruntime.Must(unicorev1.AddToScheme(clientgoscheme.Scheme)) // 这句不会自动生成,如果不加会报错 “no kind is registered for the type v1.App in scheme \"pkg/runtime/scheme.go:100\”
    scheme.AddUnversionedTypes(metav1.SchemeGroupVersion, &metav1.UpdateOptions{}, &metav1.DeleteOptions{}, &metav1.CreateOptions{})
    // +kubebuilder:scaffold:scheme
}