本节主要介绍 controller-runtime 框架的基本使用与原理。

controller-runtime(https://github.com/kubernetes-sigs/controller-runtime) 框架实际上是社区帮我们封装的一个控制器处理的框架,底层核心实现原理和我们前面去自定义一个 controller 控制器逻辑是一样的,只是在这个基础上新增了一些概念,开发者直接使用这个框架去开发控制器会更加简单方便。包括 kubebuilder、operator-sdk 这些框架其实都是在 controller-runtime 基础上做了一层封装,方便开发者快速生成项目的脚手架而已。下面我们就来分析下 controller-runtime 是如何实现的控制器处理。

Controller 的实现

首先我们还是去查看下控制器的定义以及控制器是如何启动的。控制器的定义结构体如下所示:

// pkg/internal/controller/controller.go

type Controller struct {
	// Name 用于跟踪、记录和监控中控制器的唯一标识,必填字段
	Name string

	// 可以运行的最大并发 Reconciles 数量,默认值为1
	MaxConcurrentReconciles int

	// Reconciler 是一个可以随时调用对象的 Name/Namespace 的函数
  // 确保系统的状态与对象中指定的状态一致,默认为 DefaultReconcileFunc 函数
	Do reconcile.Reconciler

	// 一旦控制器准备好启动,MakeQueue 就会为这个控制器构造工作队列
	MakeQueue func() workqueue.RateLimitingInterface

	// 队列通过监听来自 Infomer 的事件,添加对象键到队列中进行处理
	// MakeQueue 属性就是来构造这个工作队列的
  // 也就是前面我们讲解的工作队列,我们将通过这个工作队列来进行消费
	Queue workqueue.RateLimitingInterface

	// SetFields 用来将依赖关系注入到其他对象,比如 Sources、EventHandlers 以及 Predicates
	SetFields func(i interface{}) error

	// 控制器同步信号量
	mu sync.Mutex

	// 允许测试减少 JitterPeriod,使其更快完成
	JitterPeriod time.Duration

	// 控制器是否已经启动
	Started bool

	// TODO(community): Consider initializing a logger with the Controller Name as the tag

	// startWatches 维护了一个 sources、handlers 以及 predicates 列表以方便在控制器启动的时候启动
	startWatches []watchDescription

	// 日志记录
	Log logr.Logger
}

上面的结构体就是 controller-runtime 中定义的控制器结构体,我们可以看到结构体中仍然有一个限速的工作队列,但是看上去没有资源对象的 Informer 或者 Indexer 的数据,实际上这里是通过下面的 startWatches 属性做了一层封装,该属性是一个 watchDescription 队列,一个 watchDescription 包含了所有需要 watch 的信息:

// pkg/internal/controller/controller.go

// watchDescription 包含所有启动 watch 操作所需的信息
type watchDescription struct {
	src        source.Source
	handler    handler.EventHandler
	predicates []predicate.Predicate
}

整个控制器中最重要的两个函数是 Watch 与 Start,下面我们就来分析下他们是如何实现的。

Watch 函数实现

// pkg/internal/controller/controller.go

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 注入 Cache 到参数中
	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	// Controller 还没启动,把 watches 存放到本地然后返回
	//
	// 这些 watches 会被保存到控制器结构体中,直到调用 Start(...) 函数
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	c.Log.Info("Starting EventSource", "source", src)
	// 调用 src 的 Start 函数
	return src.Start(evthdler, c.Queue, prct...)
}

上面的 Watch 函数可以看到最终是去调用的 Source 这个参数的 Start 函数,Source 是事件的源,如对资源对象的 Create、Update、Delete 操作,需要由 event.EventHandlersreconcile.Requests 入队列进行处理。

// pkg/source/source.go

type Source interface {
	// Start 是一个内部函数
  // 只应该由 Controller 调用,向 Informer 注册一个 EventHandler
  // 将 reconcile.Request 放入队列
	Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

我们可以看到 source.Source 是一个接口,它是 Controller.Watch 的一个参数,所以要看具体的如何实现的 Source.Start 函数,我们需要去看传入 Controller.Watch 的参数,在 controller-runtime 中调用控制器的 Watch 函数的入口实际上位于 pkg/builder/controller.go 文件中的 doWatch() 函数:

// pkg/builder/controller.go

func (blder *Builder) doWatch() error {
	// Reconcile type
	src := &source.Kind{Type: blder.forInput.object}
	hdler := &handler.EnqueueRequestForObject{}
	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
	err := blder.ctrl.Watch(src, hdler, allPredicates...)
	if err != nil {
		return err
	}
  ......
	return nil
}

可以看到 Watch 的第一个参数是一个 source.Kind 的类型,该结构体就实现了上面的 source.Source 接口:

// pkg/source/source.go

// Kind 用于提供来自集群内部的事件源,这些事件来自于 Watches(例如 Pod Create 事件)
type Kind struct {
	// Type 是 watch 对象的类型,比如 &v1.Pod{}
	Type runtime.Object

	// cache 用于 watch 的 APIs 接口
	cache cache.Cache
}

// 真正的 Start 函数实现
func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {

	// Type 在使用之前必须提前指定
	if ks.Type == nil {
		return fmt.Errorf("must specify Kind.Type")
	}

	// cache 也应该在调用 Start 之前被注入了
	if ks.cache == nil {
		return fmt.Errorf("must call CacheInto on Kind before calling Start")
	}

	// 从 Cache 中获取 Informer
  // 并添加一个事件处理程序来添加队列
	i, err := ks.cache.GetInformer(context.TODO(), ks.Type)
	if err != nil {
		if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
			log.Error(err, "if kind is a CRD, it should be installed before calling Start",
				"kind", kindMatchErr.GroupKind)
		}
		return err
	}
	i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
	return nil
}