<aside> 🤡 本文主要对 WorkQueue 组件进行分析说明。

</aside>

介绍

前面我们把 Informer 的整个流程完整分析了一遍,我们可以通过监听对象的变化,将资源对象写入到事件处理器的回调函数中,但是如果我们直接在回调函数中处理这些数据会比较慢,对于这种情况往往我们就会使用队列来接收这些数据,然后再通过其他协程去处理这些数据,可以大大加快数据的处理速度。这个其实和 channel 有点类似,但是 channel 功能过于单一,无法满足各类场景的需求,比如限制数据队列的写入速度。

为此在 client-go 中单独提供了一个 workqueue 的组件来实现队列的功能,由于 Kubernetes 很多模块都有队列的需求,所以统一实现在了 client-go 中,不仅可以用于 Kubernetes 内部,同时也可以供调用 client-go 的模块使用。client-go 中抽象了几种队列,包括通用队列、限速队列、延时队列等等。

通用队列

首先我们来看下 client-go 中队列是如何定义的,代码位于 k8s.io/client-go/util/workqueue/queue.go 文件中:

// k8s.io/client-go/util/workqueue/queue.go

// 队列接口定义
type Interface interface {
	Add(item interface{})  // 向队列中添加一个元素
	Len() int  // 获取队列长度
	Get() (item interface{}, shutdown bool)  // 获取队列头部的元素,第二个返回值表示队列是否已经关闭
	Done(item interface{})  // 标记队列中元素已经处理完
	ShutDown()  // 关闭队列
	ShuttingDown() bool  // 查询队列是否正在关闭
}

既然是接口,那么肯定就有对应的实现,在当前文件中有一个名为 Type 的结构体,就实现了上面的队列接口:

// k8s.io/client-go/util/workqueue/queue.go

// Type 是一个 workqueue。
type Type struct {
  // 定义实际存储元素的地方,用 slice 保证处理元素的顺序,
  // 队列中的每个元素都应该在 dirty 集合中,而不是在 processing 集合中。
	queue []t  // 队列中的元素可以是任意类型的元素

	// dirty 定义了所有需要处理的元素。
  // dirty 这个词应该类似 linux 内存管理中的 dirty,表示在内存(dirty)中但未写入硬盘(queue)的数据
	dirty set

  // 当前正在处理的元素都在 processing 集合中。
  // 这些元素可能同时在 dirty 集合中,当我们处理完某个元素,从这个集合中删除时,我们会检查它是否在 dirty 集合中,如果在,就把它加入队列。
	processing set

	cond *sync.Cond

	shuttingDown bool  // 关闭标志

	metrics queueMetrics  // 用于 Prometheus 监控指标

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.Clock
}

// 空结构体
type empty struct{}
// 空接口,表示任意类型
type t interface{}  
// set 集合,用 map 来模拟 set 集合(元素不重复),集合中的元素是任意类型
type set map[t]empty

该结构体中最重要的3个属性是 queue、dirty、processing,我们可以先来看下该队列是如何实现上面的接口的,然后再理解这几个字段的关系就比较容易了。首先看下 Add 函数的实现:

// k8s.io/client-go/util/workqueue/queue.go

// Add 标记 item 元素为需要处理
func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 如果队列正在关闭,那么就直接返回了
	if q.shuttingDown {
		return
	}
  // 已经在脏数据集合中,也直接返回
	if q.dirty.has(item) {
		return
	}
  
	q.metrics.add(item)
  // 添加到脏数据集合中
	q.dirty.insert(item) 
  // 数据已经在正在处理的集合中了,那么也直接返回了
	if q.processing.has(item) {
		return
	}
  // 添加到元素队列的尾部
	q.queue = append(q.queue, item)
  // 通知其他协程解除阻塞
	q.cond.Signal()
}

上面的添加元素的函数并不是直接将元素存储在 queue 集合中,而是先添加到 dirty 集合中,这个地方的 dirty 集合主要是用来判断元素是否已经存在了,我们直接通过 queue 集合当然也可以判断,但是需要遍历一次,效率太低,而 dirty 是一个 set 集合(map 实现的),用来判断是否存在肯定是最快的方式了,所以如果数据在脏数据的集合中那么就不处理了。如果没在脏数据集合中那么就添加进去,还有一种情况是我们添加的这个元素正在处理,但是还没有调用 Done() 函数,也就是这个元素正在处理,此时再添加当前的元素应该是最新的,处理中的是过时的了,也就是脏的。

接下来看看 Get 函数是如何实现的:

// k8s.io/client-go/util/workqueue/queue.go

// 阻塞,直到可以获取头部的元素进行处理
// 如果 shutdown=true,调用者应该结束他们的协程
// 当处理完一个元素后,必须调用 Done() 函数
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 队列没有关闭并且没有数据的时候,阻塞协程
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
  // 协程被激活了,但是没有数据,则返回关闭状态
	if len(q.queue) == 0 {
		return nil, true
	}
  // 弹出第一个元素
	item, q.queue = q.queue[0], q.queue[1:]

	q.metrics.get(item)
  // 将当前元素加入到正在处理的集合中
	q.processing.insert(item)
  // 同时从脏数据集合中移除
	q.dirty.delete(item)

	return item, false
}

Get() 函数的实现比较简单,就是从真正的队列 queue 中取出第一个元素,取出来要放到正在处理的集合中,并从脏数据集合中移除。然后一个比较重要的函数就是 Done() 了,用来标记元素处理完成了:

// k8s.io/client-go/util/workqueue/queue.go

// 完成标志着元素已经处理完毕,如果在处理过程中又被标记为脏数据,则会重新加入队列进行重新处理。
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  
	q.metrics.done(item)
  // 从正在处理的集合中删除,因为已经处理完了
	q.processing.delete(item)
  // 判断脏数据集合中是否有这个元素
  // 因为在处理的期间很可能元素又被添加了
  // 那么需要重新放到队列中去进行处理
	if q.dirty.has(item) {
    // 放回队列
		q.queue = append(q.queue, item)
    // 激活协程
		q.cond.Signal()
	}
}

Done() 函数的作用就是标记元素为已经处理完成了,只是在处理元素的期间很有可能该元素又被添加进来了,出现再了脏数据集合中,那么就需要重新放回数据队列进行处理。其他的几个函数实现非常简单:

// k8s.io/client-go/util/workqueue/queue.go

// 返回当前队列中的元素长度
func (q *Type) Len() int {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	return len(q.queue)
}

// 关闭队列,ShutDown 会使队列忽略所有新添加的元素
// 一旦工作协程排空了队列中的现有元素,它们就会被标记为退出。
func (q *Type) ShutDown() {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 标记为关闭
	q.shuttingDown = true
	q.cond.Broadcast()
}

// 队列是否在关闭
func (q *Type) ShuttingDown() bool {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	return q.shuttingDown
}

通过队上面队列的实现分析,我们可以用下图来进行说明。