本节主要介绍 controller-runtime 框架的基本使用与原理。
controller-runtime(https://github.com/kubernetes-sigs/controller-runtime) 框架实际上是社区帮我们封装的一个控制器处理的框架,底层核心实现原理和我们前面去自定义一个 controller 控制器逻辑是一样的,只是在这个基础上新增了一些概念,开发者直接使用这个框架去开发控制器会更加简单方便。包括 kubebuilder、operator-sdk 这些框架其实都是在 controller-runtime 基础上做了一层封装,方便开发者快速生成项目的脚手架而已。下面我们就来分析下 controller-runtime 是如何实现的控制器处理。
首先我们还是去查看下控制器的定义以及控制器是如何启动的。控制器的定义结构体如下所示:
// pkg/internal/controller/controller.go
type Controller struct {
// Name 用于跟踪、记录和监控中控制器的唯一标识,必填字段
Name string
// 可以运行的最大并发 Reconciles 数量,默认值为1
MaxConcurrentReconciles int
// Reconciler 是一个可以随时调用对象的 Name/Namespace 的函数
// 确保系统的状态与对象中指定的状态一致,默认为 DefaultReconcileFunc 函数
Do reconcile.Reconciler
// 一旦控制器准备好启动,MakeQueue 就会为这个控制器构造工作队列
MakeQueue func() workqueue.RateLimitingInterface
// 队列通过监听来自 Infomer 的事件,添加对象键到队列中进行处理
// MakeQueue 属性就是来构造这个工作队列的
// 也就是前面我们讲解的工作队列,我们将通过这个工作队列来进行消费
Queue workqueue.RateLimitingInterface
// SetFields 用来将依赖关系注入到其他对象,比如 Sources、EventHandlers 以及 Predicates
SetFields func(i interface{}) error
// 控制器同步信号量
mu sync.Mutex
// 允许测试减少 JitterPeriod,使其更快完成
JitterPeriod time.Duration
// 控制器是否已经启动
Started bool
// TODO(community): Consider initializing a logger with the Controller Name as the tag
// startWatches 维护了一个 sources、handlers 以及 predicates 列表以方便在控制器启动的时候启动
startWatches []watchDescription
// 日志记录
Log logr.Logger
}
上面的结构体就是 controller-runtime 中定义的控制器结构体,我们可以看到结构体中仍然有一个限速的工作队列,但是看上去没有资源对象的 Informer 或者 Indexer 的数据,实际上这里是通过下面的 startWatches 属性做了一层封装,该属性是一个 watchDescription 队列,一个 watchDescription 包含了所有需要 watch 的信息:
// pkg/internal/controller/controller.go
// watchDescription 包含所有启动 watch 操作所需的信息
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}
整个控制器中最重要的两个函数是 Watch 与 Start,下面我们就来分析下他们是如何实现的。
// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// 注入 Cache 到参数中
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// Controller 还没启动,把 watches 存放到本地然后返回
//
// 这些 watches 会被保存到控制器结构体中,直到调用 Start(...) 函数
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.Log.Info("Starting EventSource", "source", src)
// 调用 src 的 Start 函数
return src.Start(evthdler, c.Queue, prct...)
}
上面的 Watch 函数可以看到最终是去调用的 Source 这个参数的 Start 函数,Source 是事件的源,如对资源对象的 Create、Update、Delete 操作,需要由 event.EventHandlers
将 reconcile.Requests
入队列进行处理。
// pkg/source/source.go
type Source interface {
// Start 是一个内部函数
// 只应该由 Controller 调用,向 Informer 注册一个 EventHandler
// 将 reconcile.Request 放入队列
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
我们可以看到 source.Source
是一个接口,它是 Controller.Watch
的一个参数,所以要看具体的如何实现的 Source.Start 函数,我们需要去看传入 Controller.Watch
的参数,在 controller-runtime 中调用控制器的 Watch 函数的入口实际上位于 pkg/builder/controller.go
文件中的 doWatch()
函数:
// pkg/builder/controller.go
func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.forInput.object}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
err := blder.ctrl.Watch(src, hdler, allPredicates...)
if err != nil {
return err
}
......
return nil
}
可以看到 Watch 的第一个参数是一个 source.Kind
的类型,该结构体就实现了上面的 source.Source
接口:
// pkg/source/source.go
// Kind 用于提供来自集群内部的事件源,这些事件来自于 Watches(例如 Pod Create 事件)
type Kind struct {
// Type 是 watch 对象的类型,比如 &v1.Pod{}
Type runtime.Object
// cache 用于 watch 的 APIs 接口
cache cache.Cache
}
// 真正的 Start 函数实现
func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Type 在使用之前必须提前指定
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
}
// cache 也应该在调用 Start 之前被注入了
if ks.cache == nil {
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}
// 从 Cache 中获取 Informer
// 并添加一个事件处理程序来添加队列
i, err := ks.cache.GetInformer(context.TODO(), ks.Type)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
return err
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}