<aside> 🤡 本文主要对 Informer 中的 DeltaFIFO 组件进行分析说明。
</aside>
上节课我们讲到 Reflector 中通过 ListAndWatch 获取到数据后传入到了本地的存储中,也就是 DeltaFIFO 中。从 DeltaFIFO 的名字可以看出它是一个 FIFO,也就是一个先进先出的队列,而 Delta 表示的是变化的资源对象存储,包含操作资源对象的类型和数据,Reflector 就是这个队列的生产者。
在了解 DeltaFIFO 之前我们需要先具体了解下什么是 Delta,我们先来看看 client-go 中是如何定义的,Delta 的数据结构定义位于staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
文件中。
// k8s.io/client-go/tools/cache/delta_fifo.go
// DeltaType 是变化的类型(添加、删除等)
type DeltaType string
// 变化的类型定义
const (
Added DeltaType = "Added" // 增加
Updated DeltaType = "Updated" // 更新
Deleted DeltaType = "Deleted" // 删除
// 当遇到 watch 错误,不得不进行重新list时,就会触发 Replaced。
// 我们不知道被替换的对象是否发生了变化。
//
// 注意:以前版本的 DeltaFIFO 也会对 Replace 事件使用 Sync。
// 所以只有当选项 EmitDeltaTypeReplaced 为真时才会触发 Replaced。
Replaced DeltaType = "Replaced"
// Sync 是针对周期性重新同步期间的合成事件
Sync DeltaType = "Sync" // 同步
)
// Delta 是 DeltaFIFO 存储的类型。
// 它告诉你发生了什么变化,以及变化后对象的状态。
//
// [*] 除非变化是删除操作,否则你将得到对象被删除前的最终状态。
type Delta struct {
Type DeltaType
Object interface{}
}
Delta 其实就是 Kubernetes 系统中带有变化类型的资源对象,如下图所示:
其实也非常好理解,比如我们现在添加了一个 Pod,那么这个 Delta 就是带有 Added 这个类型的 Pod,如果是删除了一个 Deployment,那么这个 Delta 就是带有 Deleted 类型的 Deployment,为什么要带上类型呢?因为我们需要根据不同的类型去执行不同的操作,增加、更新、删除的动作显然是不一样的。
上面我们解释了什么是 Delta,接下来需要说下 FIFO,我们说 FIFO 很好理解,就是一个先进先出的队列,Reflector 是其生产者,其数据结构定义位于 staging/src/k8s.io/client-go/tools/cache/fifo.go
文件中:
// k8s.io/client-go/tools/cache/fifo.go
type FIFO struct {
lock sync.RWMutex
cond sync.Cond
// items 中的每一个 key 也在 queue 中
items map[string]interface{}
queue []string
// 如果第一批 items 被 Replace() 插入或者先调用了 Deleta/Add/Update
// 则 populated 为 true。
populated bool
// 第一次调用 Replace() 时插入的 items 数
initialPopulationCount int
// keyFunc 用于生成排队的 item 插入和检索的 key。
keyFunc KeyFunc
// 标识队列已关闭,以便在队列清空时控制循环可以退出。
closed bool
closedLock sync.Mutex
}
var (
_ = Queue(&FIFO{}) // FIFO 是一个 Queue
)
上面的 FIFO 数据结构中定义了 items 和 queue 两个属性来保存队列中的数据,其中 queue 中存的是资源对象的 key 列表,而 items 是一个 map 类型,其 key 就是 queue 中保存的 key,value 值是真正的资源对象数据。既然是先进进去的队列,那么就要具有队列的基本功能,结构体下面其实就有一个类型断言,表示当前的 FIFO 实现了 Queue 这个接口,所以 FIFO 要实现的功能都是在 Queue 中定义的,Queue 接口和 FIFO 位于同一文件中:
// k8s.io/client-go/tools/cache/fifo.go
// Queue 扩展了 Store // with a collection of Store keys to "process".
// 每一次添加、更新或删除都可以将对象的key放入到该集合中。
// Queue 具有使用给定的 accumulator 来推导出相应的 key 的方法
// Queue 可以从多个 goroutine 中并发访问
// Queue 可以被关闭,之后 Pop 操作会返回一个错误
type Queue interface {
Store
// Pop 一直阻塞,直到至少有一个key要处理或队列被关闭,队列被关闭会返回一个错误。
// 在前面的情况下 Pop 原子性地选择一个 key 进行处理,从 Store 中删除关联(key、accumulator)的数据,
// 并处理 accumulator。Pop 会返回被处理的 accumulator 和处理的结果。
// PopProcessFunc 函数可以返回一个 ErrRequeue{inner},在这种情况下,Pop 将
//(a)把那个(key,accumulator)关联作为原子处理的一部分返回到 Queue 中
// (b) 从 Pop 返回内部错误。
Pop(PopProcessFunc) (interface{}, error)
// 仅当该 key 尚未与一个非空的 accumulator 相关联的时候,AddIfNotPresent 将给定的 accumulator 放入 Queue(与 accumulator 的 key 相关联的)
AddIfNotPresent(interface{}) error
// 如果第一批 keys 都已经 Popped,则 HasSynced 返回 true。
// 如果在添加、更新、删除之前发生了第一次 Replace 操作,则第一批 keys 为 true
// 否则为空。
HasSynced() bool
// 关闭该队列
Close()
}
从上面的定义中可以看出 Queue 这个接口扩展了 Store 这个接口,这个就是前面我们说的本地存储,队列实际上也是一种存储,然后在 Store 的基础上增加 Pop、AddIfNotPresent、HasSynced、Close 4个函数就变成了 Queue 队列了,所以我们优先来看下 Store 这个接口的定义,该数据结构定义位于文件 k8s.io/client-go/tools/cache/store.go
中:
// k8s.io/client-go/tools/cache/store.go
// Store 是一个通用的对象存储和处理的接口。
// Store 包含一个从字符串 keys 到 accumulators 的映射,并具有 to/from 当前
// 给定 key 关联的 accumulators 添加、更新和删除给定对象的操作。
// 一个 Store 还知道如何从给定的对象中获取 key,所以很多操作只提供对象。
//
// 在最简单的 Store 实现中,每个 accumulator 只是最后指定的对象,或者删除后为空,
// 所以 Store 只是简单的存储。
//
// Reflector 反射器知道如何 watch 一个服务并更新一个 Store 存储,这个包提供了 Store 的各种实现。
type Store interface {
// Add 将指定对象添加到与指定对象的 key 相关的 accumulator(累加器)中。
Add(obj interface{}) error
// Update 与指定对象的 key 相关的 accumulator 中更新指定的对象
Update(obj interface{}) error
// Delete 根据指定的对象 key 删除指定的对象
Delete(obj interface{}) error
// List 返回当前所有非空的 accumulators 的列表
List() []interface{}
// ListKeys 返回当前与非空 accumulators 关联的所有 key 的列表
ListKeys() []string
// Get 根据指定的对象获取关联的 accumulator
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey 根据指定的对象 key 获取关联的 accumulator
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace 会删除原来Store中的内容,并将新增的list的内容存入Store中,即完全替换数据
// Store 拥有 list 列表的所有权,在调用此函数后,不应该引用它了。
Replace([]interface{}, string) error
// Resync 在 Store 中没有意义,但是在 DeltaFIFO 中有意义。
Resync() error
}
// KeyFunc 就是从一个对象中生成一个唯一的 Key 的函数,上面的 FIFO 中就有用到
type KeyFunc func(obj interface{}) (string, error)
// MetaNamespaceKeyFunc 是默认的 KeyFunc,生成的 key 格式为:
// <namespace>/<name>
// 如果是全局的,则namespace为空,那么生成的 key 就是 <name>
// 当然要从 key 拆分出 namespace 和 name 也非常简单
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
Store 就是一个通用的对象存储和处理的接口,可以用来写入对象和获取对象。其中 cache 数据结构就实现了上面的 Store 接口,但是这个属于后面的 Indexer 部分的知识点,这里我们就不展开说明了。
我们说 Queue 扩展了 Store 接口,所以 Queue 本身也是一个存储,只是在存储的基础上增加了 Pop 这样的函数来实现弹出对象,是不是就变成了一个队列了。
FIFO 就是一个具体的 Queue 实现,按照顺序弹出对象是不是就是一个先进先出的队列了?如下图所示: