<aside> 🤡 本文主要对 WorkQueue 组件进行分析说明。
</aside>
前面我们把 Informer 的整个流程完整分析了一遍,我们可以通过监听对象的变化,将资源对象写入到事件处理器的回调函数中,但是如果我们直接在回调函数中处理这些数据会比较慢,对于这种情况往往我们就会使用队列来接收这些数据,然后再通过其他协程去处理这些数据,可以大大加快数据的处理速度。这个其实和 channel 有点类似,但是 channel 功能过于单一,无法满足各类场景的需求,比如限制数据队列的写入速度。
为此在 client-go 中单独提供了一个 workqueue 的组件来实现队列的功能,由于 Kubernetes 很多模块都有队列的需求,所以统一实现在了 client-go 中,不仅可以用于 Kubernetes 内部,同时也可以供调用 client-go 的模块使用。client-go 中抽象了几种队列,包括通用队列、限速队列、延时队列等等。
首先我们来看下 client-go 中队列是如何定义的,代码位于 k8s.io/client-go/util/workqueue/queue.go
文件中:
// k8s.io/client-go/util/workqueue/queue.go
// 队列接口定义
type Interface interface {
Add(item interface{}) // 向队列中添加一个元素
Len() int // 获取队列长度
Get() (item interface{}, shutdown bool) // 获取队列头部的元素,第二个返回值表示队列是否已经关闭
Done(item interface{}) // 标记队列中元素已经处理完
ShutDown() // 关闭队列
ShuttingDown() bool // 查询队列是否正在关闭
}
既然是接口,那么肯定就有对应的实现,在当前文件中有一个名为 Type
的结构体,就实现了上面的队列接口:
// k8s.io/client-go/util/workqueue/queue.go
// Type 是一个 workqueue。
type Type struct {
// 定义实际存储元素的地方,用 slice 保证处理元素的顺序,
// 队列中的每个元素都应该在 dirty 集合中,而不是在 processing 集合中。
queue []t // 队列中的元素可以是任意类型的元素
// dirty 定义了所有需要处理的元素。
// dirty 这个词应该类似 linux 内存管理中的 dirty,表示在内存(dirty)中但未写入硬盘(queue)的数据
dirty set
// 当前正在处理的元素都在 processing 集合中。
// 这些元素可能同时在 dirty 集合中,当我们处理完某个元素,从这个集合中删除时,我们会检查它是否在 dirty 集合中,如果在,就把它加入队列。
processing set
cond *sync.Cond
shuttingDown bool // 关闭标志
metrics queueMetrics // 用于 Prometheus 监控指标
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
// 空结构体
type empty struct{}
// 空接口,表示任意类型
type t interface{}
// set 集合,用 map 来模拟 set 集合(元素不重复),集合中的元素是任意类型
type set map[t]empty
该结构体中最重要的3个属性是 queue、dirty、processing,我们可以先来看下该队列是如何实现上面的接口的,然后再理解这几个字段的关系就比较容易了。首先看下 Add 函数的实现:
// k8s.io/client-go/util/workqueue/queue.go
// Add 标记 item 元素为需要处理
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果队列正在关闭,那么就直接返回了
if q.shuttingDown {
return
}
// 已经在脏数据集合中,也直接返回
if q.dirty.has(item) {
return
}
q.metrics.add(item)
// 添加到脏数据集合中
q.dirty.insert(item)
// 数据已经在正在处理的集合中了,那么也直接返回了
if q.processing.has(item) {
return
}
// 添加到元素队列的尾部
q.queue = append(q.queue, item)
// 通知其他协程解除阻塞
q.cond.Signal()
}
上面的添加元素的函数并不是直接将元素存储在 queue 集合中,而是先添加到 dirty 集合中,这个地方的 dirty 集合主要是用来判断元素是否已经存在了,我们直接通过 queue 集合当然也可以判断,但是需要遍历一次,效率太低,而 dirty 是一个 set 集合(map 实现的),用来判断是否存在肯定是最快的方式了,所以如果数据在脏数据的集合中那么就不处理了。如果没在脏数据集合中那么就添加进去,还有一种情况是我们添加的这个元素正在处理,但是还没有调用 Done() 函数,也就是这个元素正在处理,此时再添加当前的元素应该是最新的,处理中的是过时的了,也就是脏的。
接下来看看 Get 函数是如何实现的:
// k8s.io/client-go/util/workqueue/queue.go
// 阻塞,直到可以获取头部的元素进行处理
// 如果 shutdown=true,调用者应该结束他们的协程
// 当处理完一个元素后,必须调用 Done() 函数
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 队列没有关闭并且没有数据的时候,阻塞协程
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
// 协程被激活了,但是没有数据,则返回关闭状态
if len(q.queue) == 0 {
return nil, true
}
// 弹出第一个元素
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 将当前元素加入到正在处理的集合中
q.processing.insert(item)
// 同时从脏数据集合中移除
q.dirty.delete(item)
return item, false
}
Get() 函数的实现比较简单,就是从真正的队列 queue 中取出第一个元素,取出来要放到正在处理的集合中,并从脏数据集合中移除。然后一个比较重要的函数就是 Done() 了,用来标记元素处理完成了:
// k8s.io/client-go/util/workqueue/queue.go
// 完成标志着元素已经处理完毕,如果在处理过程中又被标记为脏数据,则会重新加入队列进行重新处理。
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
// 从正在处理的集合中删除,因为已经处理完了
q.processing.delete(item)
// 判断脏数据集合中是否有这个元素
// 因为在处理的期间很可能元素又被添加了
// 那么需要重新放到队列中去进行处理
if q.dirty.has(item) {
// 放回队列
q.queue = append(q.queue, item)
// 激活协程
q.cond.Signal()
}
}
Done() 函数的作用就是标记元素为已经处理完成了,只是在处理元素的期间很有可能该元素又被添加进来了,出现再了脏数据集合中,那么就需要重新放回数据队列进行处理。其他的几个函数实现非常简单:
// k8s.io/client-go/util/workqueue/queue.go
// 返回当前队列中的元素长度
func (q *Type) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// 关闭队列,ShutDown 会使队列忽略所有新添加的元素
// 一旦工作协程排空了队列中的现有元素,它们就会被标记为退出。
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 标记为关闭
q.shuttingDown = true
q.cond.Broadcast()
}
// 队列是否在关闭
func (q *Type) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}
通过队上面队列的实现分析,我们可以用下图来进行说明。