转载自:
一、背景介绍
CRD
即CustomResourceDefinition
,是 kubernetes 极力推荐的资源扩展方式
;基于 CRD 技术,用户能将自定义资源注册到 kubernetes 系统,并像使用原生资源(如 pod、statefulset )一样对自定义资源进行创建、查看、修改、删除
等操作;CRD 是对 kubernetes 进行功能拓展的友好方式,实现了类似于插件式的功能增强!
二、sample-controller 的简单试用
sample-controller
是 kubernetes 官方提供的 CRD Controller 样例实现,
Git 地址:https://github.com/kubernetes/sample-controller
添加自定义控制器 Bar 示例代码:https://github.com/SataQiu/sample-controller/tree/bar
下面,我们就以该样例程序为例,研究如何实现自定义 CRD 控制器,那么先把它用起来看看效果!
2.1 准备 kubernetes 运行环境
我这里用的 kind,安装比较简单,可参考博文《使用 kind 快速部署 Kubernetes 实验集群》
注意,上边博文没有安装网络,可以通过如下命令安装 weave 网络插件:
# kubectl apply -f "https://cloud.weave.works/k8s/net?k8s-version=$(kubectl version | base64 | tr -d '\n')"
2.2 下载 sample-controller 源码到 $GOPATH/src/k8s.io 路径下
# git clone https://github.com/kubernetes/sample-controller.git
2.3 编译 sample-controller
# go build -o sample-controller .
2.4 运行 sample-controller
# ./sample-controller -kubeconfig=$HOME/.kube/kind-config-1
2.5 创建/注册 CRD 资源
# kubectl create -f artifacts/examples/crd.yaml
2.6 创建 Foo 对象
# kubectl create -f artifacts/examples/example-foo.yaml
2.7 查看 Foo 的部署情况
# kubectl get deployments
三、源码分析
3.1 首先分析下 CRD 资源定义
文件位置 artifacts/examples/crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
该定义文件,声明了一种名为 Foo 的资源,告诉 API Server,有一种资源叫做 Foo
该 Foo 资源将被 sample-controller 所监听,并对其相关事件进行处理
3.2 然后,看下创建 Foo 资源的 YAML 定义
文件位置 artifacts/examples/example-foo.yaml
apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
name: example-foo
spec:
deploymentName: example-foo
replicas: 1
该文件声明要创建的资源为 Foo 类型,副本数为 1
这个创建事件会被 sample-controller 拦截和处理
3.3 最后,重点分析 sample-controller 的实现逻辑
a. 首先,找到入口函数 sample-controller/main.go#main
func main() {
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
大体流程就是:
- > 读取 kubeconfig 配置,构造用于事件监听的 Kubernetes Client
这里创建了两个,一个监听普通事件,一个监听 Foo 事件
- > 基于 Client 构造监听相关的 informer
- > 基于 Client、Informer 初始化自定义 Controller,监听 Deployment 以及 Foos 资源变化
- > 开启 Controller
b. 紧接着,我们看下 Controller 是如何处理事件的
代码位置 sample-controller/controller.go
首先,看下 Controller 的结构定义
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}
Controller 的关键成员即两个事件的 Listener(appslisters.DeploymentLister、listers.FooLister
)
这两个成员将由 main 函数传入参数进行初始化
此外,为了缓冲事件处理,这里使用队列暂存事件,相关成员即为 workqueue.RateLimitingInterface
record.EventRecorder 用于记录事件
接着,分析 Controller 的构造过程,代码如下:
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// Set up an event handler for when Deployment resources change. This
// handler will lookup the owner of the given Deployment, and if it is
// owned by a Foo resource will enqueue that Foo resource for
// processing. This way, we don't need to implement custom logic for
// handling Deployment resources. More info on this pattern:
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
初始化 Controller 大体过程如下:
- > 将 sample-controller 的类型信息(Foo)添加到默认 Kubernetes Scheme,以便能够记录到其事件
- > 基于新 Scheme 创建一个事件记录 recorder ,用于记录来自 “sample-controller” 的事件
- > 基于函数入参及刚刚构造的 recorder,初始化 Controller
- > 设置对 Foo 资源变化的事件处理函数(Add、Update 均通过 enqueueFoo 处理)
- > 设置对 Deployment 资源变化的事件处理函数(Add、Update、Delete 均通过 handleObject 处理)
- > 返回初始化的 Controller
进一步,分析 enqueueFoo 以及 handleObject 的实现
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
klog.V(4).Infof("Processing object: %s", object.GetName())
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more
// with it.
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
return
}
c.enqueueFoo(foo)
return
}
}
enqueueFoo 就是解析 Foo 资源为 namespace/name 形式的字符串
,然后入队handleObject 监听了所有实现了 metav1 的资源
,但只过滤出 owner 是 Foo 的
,将其解析为 namespace/name入队
小结:在构造 Controller 时就已经初始化好事件收集这部分的工作了
那如何处理队列里的这些事件呢?
来到 Run 函数的定义处
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
// Start the informer factories to begin populating the informer caches
klog.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
// Launch two workers to process Foo resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
Run 函数的执行过程大体如下:
-
等待 Informer 同步完成
-
并发
runWorker
,处理队列内事件
runWorker 的定义
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
processNextWorkItem
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
processNextWorkItem
的处理流程大体如下:
- > 从队列取出待处理对象
- > 调用syncHandler
处理
再来分析 syncHandler 的处理细节
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Foo resource with this namespace/name
foo, err := c.foosLister.Foos(namespace).Get(name)
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}
return err
}
deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}
// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and ret
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
}
// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. THis could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
syncHandler 的处理逻辑大体如下:
- > 根据 namespace/name 获取 foo 资源
- > 根据 foo,获取其 Deployment 名称,进而获取 deployment 资源(没有就为其创建)
- > 根据 foo 的 Replicas 更新 deployment 的 Replicas
(如果不匹配)
- > 更新 foo 资源的状态为最新 deployment 的状态(其实就是 AvailableReplicas)
由此,可知 foo 的实现实体其实就是 deployment
看下 deployment 的实现代码
// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
labels := map[string]string{
"app": "nginx",
"controller": foo.Name,
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: foo.Spec.DeploymentName,
Namespace: foo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(foo, schema.GroupVersionKind{
Group: samplev1alpha1.SchemeGroupVersion.Group,
Version: samplev1alpha1.SchemeGroupVersion.Version,
Kind: "Foo",
}),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: foo.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
}
简单逻辑就是根据 foo 资源的 namespace、name、deploymentname、replicas 信息创建 nginx deployment 而已
需要注意的是 OwnerReferences 里需要与 Foo 类型绑定(Group、Version、Kind)
,
主要是要与采集处匹配,因为 handleObject 中的筛选 Foo 资源代码是根据 Kind 值做的
if ownerRef.Kind != "Foo" {
return
}
c. 自定义 Controller 是如何与 crd.yaml 定义关联的?
我们知道,一开始是通过 crd.yaml 来通告 kubernetes 我们自定义资源的 scheme 的,那是如何与 Controller 关联的呢?其实就在于 pkg/apis 目录下pkg/apis 下定义了自定义资源的相关属性信息,我们简单看下:
pkg/samplecontroller/v1alpha1/register.go(处理类型 Schema)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Foo{},
&FooList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
与之前的 crd 定义对比下
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
会发现 controller 与 crd 两者的 group、version 都是一致的
,而且 metadata 的 name 是符合 <plural>.<group> 规范的
在 k8s 系统中,一旦创建了 CRD,对该 CRD 的增删改查其实就已经被支持了,我们的 Controller 只是监听自己感兴趣的资源事件,做出真实的部署、更新、移除等动作