Kubernetes编写自定义controller

Posted by 爱折腾的工程师 on Monday, January 14, 2019

简介

client-go是kubernetes官方的一个通用库,通过它可以很容易实现自定义controller.

关于如何使用client-go,参考徐超大神的分享:使用 client-go 控制原生及拓展的 Kubernetes API

client-go & controller架构设计

client-go库包含开发自定义控制器时可以使用的各种机制,这些机制在库的tools/cache文件夹中定义。

来自kubernetes官方github的一张图:

如图所示,图中的组件分为client-go和custom controller两部分:

  1. client-go部分

    • Reflector: 监视特定资源的k8s api, 把新监测的对象放入Delta Fifo队列,完成此操作的函数是ListAndWatch。
    • Informer: 从Delta Fifo队列拿出对象,完成此操作的函数是processLoop。
    • Indexer: 提供线程级别安全来存储对象和key。
  2. custom-controller部分

    • Informer reference: Informer对象引用
    • Indexer reference: Indexer对象引用
    • Resource Event Handlers: 被Informer调用的回调函数,这些函数的作用通常是获取对象的key,并把key放入Work queue,以进一步做处理。
    • Work queue: 工作队列,用于将对象的交付与其处理分离,编写Resource event handler functions以提取传递的对象的key并将其添加到工作队列。
    • Process Item: 用于处理Work queue中的对象,可以有一个或多个其他函数一起处理;这些函数通常使用Indexer reference或Listing wrapper来检索与该键对应的对象。

client-go官方代码例子

package main

import (
    "flag"
    "fmt"
    "time"

    "k8s.io/klog"

    "k8s.io/api/core/v1"
    meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

// 定义一个结构体Controller
type Controller struct {
    indexer  cache.Indexer
    queue    workqueue.RateLimitingInterface
    informer cache.Controller
}

// 获取controller的函数
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
    return &Controller{
        informer: informer,
        indexer:  indexer,
        queue:    queue,
    }
}

// 处理workqueue中的对象
func (c *Controller) processNextItem() bool {
    // Wait until there is a new item in the working queue
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    // Tell the queue that we are done with processing this key. This unblocks the key for other workers
    // This allows safe parallel processing because two pods with the same key are never processed in
    // parallel.
    defer c.queue.Done(key)

    // Invoke the method containing the business logic
    err := c.syncToStdout(key.(string))
    // Handle the error if something went wrong during the execution of the business logic
    c.handleErr(err, key)
    return true
}

// syncToStdout is the business logic of the controller. In this controller it simply prints
// information about the pod to stdout. In case an error happened, it has to simply return the error.
// The retry logic should not be part of the business logic.
func (c *Controller) syncToStdout(key string) error {
    obj, exists, err := c.indexer.GetByKey(key)
    if err != nil {
        klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }

    if !exists {
        // Below we will warm up our cache with a Pod, so that we will see a delete for one pod
        fmt.Printf("Pod %s does not exist anymore\n", key)
    } else {
        // Note that you also have to check the uid if you have a local controlled resource, which
        // is dependent on the actual instance, to detect that a Pod was recreated with the same name
        fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
    }
    return nil
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key interface{}) {
    if err == nil {
        // Forget about the #AddRateLimited history of the key on every successful synchronization.
        // This ensures that future processing of updates for this key is not delayed because of
        // an outdated error history.
        c.queue.Forget(key)
        return
    }

    // This controller retries 5 times if something goes wrong. After that, it stops trying.
    if c.queue.NumRequeues(key) < 5 {
        klog.Infof("Error syncing pod %v: %v", key, err)

        // Re-enqueue the key rate limited. Based on the rate limiter on the
        // queue and the re-enqueue history, the key will be processed later again.
        c.queue.AddRateLimited(key)
        return
    }

    c.queue.Forget(key)
    // Report to an external entity that, even after several retries, we could not successfully process this key
    runtime.HandleError(err)
    klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
    defer runtime.HandleCrash()

    // Let the workers stop when we are done
    defer c.queue.ShutDown()
    klog.Info("Starting Pod controller")

    go c.informer.Run(stopCh)

    // Wait for all involved caches to be synced, before processing items from the queue is started
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    <-stopCh
    klog.Info("Stopping Pod controller")
}

func (c *Controller) runWorker() {
    for c.processNextItem() {
    }
}

func main() {
    var kubeconfig string
    var master string

    // 指定kubeconfig文件
    flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
    flag.StringVar(&master, "master", "", "master url")
    flag.Parse()

    // creates the connection
    config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
    if err != nil {
        klog.Fatal(err)
    }

    // creates the clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        klog.Fatal(err)
    }

    // create the pod watcher
    podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

    // create the workqueue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    // Bind the workqueue to a cache with the help of an informer. This way we make sure that
    // whenever the cache is updated, the pod key is added to the workqueue.
    // Note that when we finally process the item from the workqueue, we might see a newer version
    // of the Pod than the version which was responsible for triggering the update.
    indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(old interface{}, new interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            // IndexerInformer uses a delta queue, therefore for deletes we have to use this
            // key function.
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    }, cache.Indexers{})

    controller := NewController(queue, indexer, informer)

    // We can now warm up the cache for initial synchronization.
    // Let's suppose that we knew about a pod "mypod" on our last run, therefore add it to the cache.
    // If this pod is not there anymore, the controller will be notified about the removal after the
    // cache has synchronized.
    indexer.Add(&v1.Pod{
        ObjectMeta: meta_v1.ObjectMeta{
            Name:      "mypod",
            Namespace: v1.NamespaceDefault,
        },
    })

    // Now let's start the controller
    stop := make(chan struct{})
    defer close(stop)
    go controller.Run(1, stop)

    // Wait forever
    select {}
}

常用小技巧

特定资源过滤

使用WithTweakListOptions结合LabelSelector或FieldSelector可以针对特定的资源

informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
    informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
        options.LabelSelector = "!" + apis.LabelServiceProxyName
    }))

分页

client-go提供limit和continue字段来实现

GET /api/v1/pods?limit=500
---
200 OK
Content-Type: application/json

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "resourceVersion":"10245",
    "continue": "ENCODED_CONTINUE_TOKEN",
    ...
  },
  "items": [...] // returns pods 1-500
}
GET /api/v1/pods?limit=500&continue=ENCODED_CONTINUE_TOKEN
---
200 OK
Content-Type: application/json

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "resourceVersion":"10245",
    "continue": "ENCODED_CONTINUE_TOKEN_2",
    ...
  },
  "items": [...] // returns pods 501-1000
}
GET /api/v1/pods?limit=500&continue=ENCODED_CONTINUE_TOKEN_2
---
200 OK
Content-Type: application/json

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {
    "resourceVersion":"10245",
    "continue": "", // continue token is empty because we have reached the end of the list
    ...
  },
  "items": [...] // returns pods 1001-1253
}

不足500条记录,continue值为0,不用再继续显示下一页了

client-go实现apply yaml

// convert yaml to runtime.Object
obj, _, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode([]byte(yamlContent), nil, nil)
if err != nil {
   return err
}

// apply resource to k8s cluster
err = applyResource(client, obj)
if err != nil {
   return err
}

func applyResource(client Client, obj runtime.Object) error {
	// convert runtime.Object to unstructured object map
	unstructuredObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
	if err != nil {
		return err
	}
	ctx := context.Background()

	// get GroupVersionKind
	gvk := obj.GetObjectKind().GroupVersionKind()

	// get GroupVersionResource
	gvr, err := getGroupVersionResource(client, gvk)
	if err != nil {
		return err
	}

	// generate unstructured object
	unstructuredObj := &unstructured.Unstructured{
		Object: unstructuredObjMap,
	}

	_, err = client.Dynamic().Resource(gvr).
		Namespace(unstructuredObj.GetNamespace()).
		Apply(ctx, unstructuredObj.GetName(), unstructuredObj, metav1.ApplyOptions{
			FieldManager: unstructuredObj.GetName(),
			Force:        true,
		})
	if err != nil {
		return fmt.Errorf("dynamic resource %s/%s apply failed, %w", gvk.Kind, unstructuredObj.GetName(), err)
	}
	return nil
}

func getGroupVersionResource(client Client, gvk schema.GroupVersionKind) (schema.GroupVersionResource, error) {
	// get GroupVersionResource with specific GroupVersion
	resourceList, err := client.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String())
	if err != nil {
		return schema.GroupVersionResource{}, err
	}

	for _, resource := range resourceList.APIResources {
		if resource.Kind == gvk.Kind {
			return schema.GroupVersionResource{
				Group:    gvk.Group,
				Version:  gvk.Version,
				Resource: resource.Name,
			}, nil
		}
	}
	return schema.GroupVersionResource{}, fmt.Errorf("GroupVersionResource not found for GroupVersionKind: %s", gvk.String())
}

参考链接

「真诚赞赏,手留余香」

爱折腾的工程师

真诚赞赏,手留余香

使用微信扫描二维码完成支付