通过 Kubernetes Go Client 实现 SSA

更新 K8s 对象

在 K8s 更新资源操作有两种形式:Replace 和 Patch。

Replace 形式主要用于替换对象 spec,这是一个 read-then-write 操作,即你要先从 K8s 获取对象,修改 spec,再提交更新。这个过程中对象其他字段不需要特意维护,提交的 apiVersionkindmetadata 中提供的信息会用于确定对象,而 status 则会被 K8s 忽略(如果需要更新 status,需要用专用的操作)。

在对象 metadata 中有一个 resourceVersion 字段,用来追踪持久化对象的变动,每次持久化对象发生变化时 resourceVersion 都会改变。Replace 隐含的竞争策略乐观锁就是依靠这个字段实现的,当操作被提交到 K8s 但 resourceVersion 不匹配时,操作会失败。

Patch 操作直接在对象上修改指定的字段,并且通过 Patch 类型确定修改的方式(合并还是覆盖)。和 Replace 不同的是,Patch 操作竞争策略是 last-write-wins,这也意味着不需要先读取对象就可以修改对象。

Patch 类型

Patch 类型有 4 种:JSON PatchJSON Merge PatchStrategic Merge PatchApply Patch

  • Strategic Merge Patch 根据各个字段预设的 Patch 策略(可以在 API 文档或者源码中发现 patchStrategy 字样内容)合并修改内容,是 kubectl patch 默认的 Patch 类型
  • JSON PatchJSON Merge Patch 参考文章 JSON Patch and JSON Merge Patch
  • Apply Patch 指的就是 Server-Side Apply

官方文档 Declarative Management of Kubernetes Objects Using Configuration Files 描述了 kubectl 如何利用 kubectl.kubernetes.io/last-applied-configuration 实现 Client-Side Apply,这里不展开说明。

Server-Side Apply

SSA(Server-Side Apply) 是在 K8s 1.22 GA 的特性,可以让控制器或者客户端在管理对象时只需要关注自己关心的字段,将对象改动的合并在服务端完成,并且(在 K8s 1.18 beta)引入 Field Management 机制,通过让字段所有权(ownership)和 fieldManager 关联的方式解决冲突,尤其当有多个控制器或者客户端管理同一个对象时,能尽量的避免冲突的发生。而当一个 fieldManager 尝试 apply 对象但有字段的所有权不属于自己时,操作就会冲突。

Field Management 满足以下规则:

  • 如果 apply 一个改动,则必须保证修改者已拥有字段所有权,否则操作冲突
  • 在同一个字段上 apply 相同的值(没有改动),字段所有权会被共享,共享后任何一个管理者改动字段都会冲突
  • 成功改动一个字段后字段所有权会被转移到修改者(包括 force SSA 和非 SSA 的 update 操作),其他 fieldManager 会被移除
  • 从 apply 的配置里移除已拥有所有权的字段会释放对应所有权,没有所有者的字段可能被移除或者重置为默认值

字段所有权维护在 K8s 对象 metadata.managedFields 里,通常情况下不建议操作这个字段的内容,但是官方文档里也提到了你可以在需要的时候维护它。

可以通过以下命令查看一个 Pod 的 managedFields

1
kubectl get --show-managed-fields -o yaml pods <pod-name>

目前 kubectl 默认还是 client-side apply,如果你想尝试 SSA,可以通过 kubectl apply --server-side

通过 k8s.io/client-go 实现 SSA

(写到这里,我已经没耐心写了。)

GVK 和 GVR

在 K8s 中,一个带命名空间的资源类型实例 URI 大概是这样的:/apis/GROUP/VERSION/namespaces/NAMESPACE/RESOURCETYPE/NAME,其中 GROUPVERSIONRESOURCETYPE 合起来简称 GVR,用来标识资源类型,一个 GVR 要么是集群范围,要么是命名空间范围的。

另一个概念叫 GVK,即 GroupVersionKind,用于标识 Kind,例如 K8s 对象中的 apiVersionkind 字段。

GVK 和 GVR 存在一定映射关系,称为 REST mapping,可以通过命令 kubectl api-resources 查看。在使用客户端开发时,需要通过 k8s.io/client-go/discoveryk8s.io/client-go/restmapper 完成 GVK 和 GVR 相互映射。

dynamic client 和 unstructured

k8s.io/client-go 有两种风格的客户端,一种是 k8s.io/client-go/kubernetesClientset,通过结构化的方式访问 K8s API(可以在 k8s.io/api 中查看),在 k8s.io/apimachinery 中预设了大量资源对应对象的类型,并且 KindResource Type 的映射也由客户端自己(手工)维护了,但这不符合 SSA 的场景;另一种就是 k8s.io/client-go/dynamic 提供的接口,这些接口使用了 unstructured 类型,通过动态的数据结构,可以灵活填充自己需要的数据。

(我真的不知道这些东西有什么好说的)

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main

import (
"context"
"encoding/json"
"flag"
"os"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

var kube struct {
di dynamic.Interface
mapper *restmapper.DeferredDiscoveryRESTMapper
fieldManagerName string
}

func geteResourseInterface(gvk *schema.GroupVersionKind, ns string) (dynamic.ResourceInterface, error) {
gvr, err := kube.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
var ri dynamic.ResourceInterface
if gvr.Scope.Name() == meta.RESTScopeNameNamespace {
ri = kube.di.Resource(gvr.Resource).Namespace(ns)
} else {
ri = kube.di.Resource(gvr.Resource)
}
return ri, nil
}

func applyYaml(fname string) (*unstructured.Unstructured, error) {
data, err := os.ReadFile(fname)
if err != nil {
return nil, err
}
dec := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme)
var obj unstructured.Unstructured
_, gvk, err := dec.Decode(data, nil, &obj)
if err != nil {
return nil, err
}

err = unstructured.SetNestedField(obj.Object, time.Now().Format(time.RFC3339), "spec", "template", "metadata", "annotations", kube.fieldManagerName+"/deployAt")
if err != nil {
return nil, err
}

objData, err := runtime.Encode(unstructured.UnstructuredJSONScheme, &obj)
if err != nil {
return nil, err
}

ri, err := geteResourseInterface(gvk, obj.GetNamespace())
if err != nil {
return nil, err
}

_, err = ri.Patch(context.Background(), obj.GetName(), types.ApplyPatchType, objData, metav1.PatchOptions{
FieldManager: kube.fieldManagerName,
})
if err != nil {
return nil, err
}

return &obj, nil
}

func printObject(name string) error {
ri, err := geteResourseInterface(&schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, "default")
if err != nil {
return err
}
obj, err := ri.Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return err
}

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(obj)

return nil
}

func assert(op string, err error) {
if err != nil {
klog.Fatalf("%s: %v", op, err)
}
}

func main() {
var (
kubeConfig string
kubeMaster string
kubeFile string
)
flag.StringVar(&kubeConfig, "kubeconfig", "kube-config.yaml", "k8s config file")
flag.StringVar(&kubeMaster, "master", "", "k8s master url")
flag.StringVar(&kube.fieldManagerName, "fm", "app-deployer", "field manager")
flag.StringVar(&kubeFile, "f", "", "k8s resource yaml file")
klog.InitFlags(flag.CommandLine)
flag.Parse()

if kubeFile == "" {
flag.Usage()
os.Exit(1)
}

cfg, err := clientcmd.BuildConfigFromFlags(kubeMaster, kubeConfig)
assert("clientcmd.BuildConfigFromFlags", err)

klog.Infof("load config %s: host: %s", kubeConfig, cfg.Host)

kube.di, err = dynamic.NewForConfig(cfg)
assert("dynamic.NewForConfig", err)

dc, err := discovery.NewDiscoveryClientForConfig(cfg)
assert("discovery.NewDiscoveryClientForConfig", err)

kube.mapper = restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))

obj, err := applyYaml(kubeFile)
assert("applyYaml", err)

klog.Infof("apply ok")

err = printObject(obj.GetName())
assert("printObject", err)
}

结尾

在 K8s 里,一个 K8s 对象表示 record of intent,其中 spec 表示期望状态,status 表示当前状态,而 SSA 则是让多个控制器和客户端在一个 K8s 对象上只需要关注自己的 fully specified intent

大概就是这样。

参考资料

参考的官方文档来自 K8s v1.24。