<aside> 🤡 本文主要对 Informer 中的 Reflector 组件进行分析说明。
</aside>
前面我们说了 Informer 通过对 APIServer 的资源对象执行 List 和 Watch 操作,把获取到的数据存储在本地的缓存中,其中实现这个的核心功能就是 Reflector,我们可以称其为反射器,从名字我们可以看出来它的主要功能就是反射,就是将 Etcd 里面的数据反射到本地存储(DeltaFIFO)中。Reflector 首先通过 List 操作获取所有的资源对象数据,保存到本地存储,然后通过 Watch 操作监控资源的变化,触发相应的事件处理,比如前面示例中的 Add 事件、Update 事件、Delete 事件。
Reflector 结构体的定义位于 staging/src/k8s.io/client-go/tools/cache/reflector.go
下面:
// k8s.io/client-go/tools/cache/reflector.go
// Reflector(反射器) 监听指定的资源,将所有的变化都反射到给定的存储中去
type Reflector struct {
// name 标识这个反射器的名称,默认为 文件:行数(比如reflector.go:125)
// 默认名字通过 k8s.io/apimachinery/pkg/util/naming/from_stack.go 下面的 GetNameFromCallsite 函数生成
name string
// 期望放到 Store 中的类型名称,如果提供,则是 expectedGVK 的字符串形式
// 否则就是 expectedType 的字符串,它仅仅用于显示,不用于解析或者比较。
expectedTypeName string
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
// 我们放到 Store 中的对象类型
expectedType reflect.Type
// 如果是非结构化的,我们期望放在 Sotre 中的对象的 GVK
expectedGVK *schema.GroupVersionKind
// 与 watch 源同步的目标 Store
store Store
// 用来执行 lists 和 watches 操作的 listerWatcher 接口(最重要的)
listerWatcher ListerWatcher
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
resyncPeriod time.Duration
// ShouldResync 会周期性的被调用,当返回 true 的时候,就会调用 Store 的 Resync 操作
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
paginatedResult bool
// Kubernetes 资源在 APIServer 中都是有版本的,对象的任何修改(添加、删除、更新)都会造成资源版本更新,lastSyncResourceVersion 就是指的这个版本
lastSyncResourceVersion string
// 如果之前的 list 或 watch 带有 lastSyncResourceVersion 的请求中是一个 HTTP 410(Gone)的失败请求,则 isLastSyncResourceVersionGone 为 true
isLastSyncResourceVersionGone bool
// lastSyncResourceVersionMutex 用于保证对 lastSyncResourceVersion 的读/写访问。
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
}
// NewReflector 创建一个新的反射器对象,将使给定的 Store 保持与服务器中指定的资源对象的内容同步。
// 反射器只把具有 expectedType 类型的对象放到 Store 中,除非 expectedType 是 nil。
// 如果 resyncPeriod 是非0,那么反射器会周期性地检查 ShouldResync 函数来决定是否调用 Store 的 Resync 操作
// `ShouldResync==nil` 意味着总是要执行 Resync 操作。
// 这使得你可以使用反射器周期性地处理所有的全量和增量的对象。
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
// 默认的反射器名称为 file:line
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector 与 NewReflector 一样,只是指定了一个 name 用于日志记录
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
}
r.setExpectedType(expectedType)
return r
}
从源码中我们可以看出来通过 NewReflector
实例化反射器的时候,必须传入一个 ListerWatcher
接口对象,这个也是反射器最核心的功能,该接口拥有 List 和 Watch 方法,用于获取和监控资源对象。
// k8s.io/client-go/tools/cache/listwatch.go
// Lister 是知道如何执行初始化List列表的对象
type Lister interface {
// List 应该返回一个列表类型的对象;
// Items 字段将被解析,ResourceVersion 字段将被用于正确启动 watch 的地方
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher 是知道如何执行 watch 操作的任何对象
type Watcher interface {
// Watch 在指定的版本开始执行 watch 操作
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher 是任何知道如何对一个资源执行初始化List列表和启动Watch监控操作的对象
type ListerWatcher interface {
Lister
Watcher
}
而 Reflector 对象通过 Run 函数来启动监控并处理监控事件的:
// k8s.io/client-go/tools/cache/reflector.go
// Run 函数反复使用反射器的 ListAndWatch 函数来获取所有对象和后续的 deltas。
// 当 stopCh 被关闭的时候,Run函数才会退出。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
所以不管我们传入的 ListWatcher 对象是如何实现的 List 和 Watch 操作,只要实现了就可以,最主要的还是看 ListAndWatch 函数是如何去实现的,如何去调用 List 和 Watch 的:
// k8s.io/client-go/tools/cache/reflector.go
// ListAndWatch 函数首先列出所有的对象,并在调用的时候获得资源版本,然后使用该资源版本来进行 watch 操作。
// 如果 ListAndWatch 没有初始化 watch 成功就会返回错误。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// 如果 listWatcher 支持,会尝试 chunks(分块)收集 List 列表数据
// 如果不支持,第一个 List 列表请求将返回完整的响应数据。
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// 获得一个初始的分页结果。假定此资源和服务器符合分页请求,并保留默认的分页器大小设置
case options.ResourceVersion != "" && options.ResourceVersion != "0":
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list 成功
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
// 获取资源版本号
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 将资源数据转换成资源对象列表,将 runtime.Object 对象转换成 []runtime.Object 对象
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 将资源对象列表中的资源对象和资源版本号存储在 Store 中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup()
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// 如果 ShouldResync 为 nil 或者调用返回true,则执行 Store 的 Resync 操作
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// stopCh 一个停止循环的机会
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
// 设置watch的选项,因为前期列举了全量对象,从这里只要监听最新版本以后的资源就可以了
// 如果没有资源变化总不能一直挂着吧?也不知道是卡死了还是怎么了,所以设置一个超时会好一点
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true,
}
start := r.clock.Now()
// 执行 Watch 操作
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
// 调用 watchHandler 来处理分发 watch 到的事件对象
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
首先通过反射器的 relistResourceVersion
函数获得反射器 relist 的资源版本,如果资源版本非 0,则表示根据资源版本号继续获取,当传输过程中遇到网络故障或者其他原因导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。可以使本地缓存中的数据与Etcd集群中的数据保持一致,该函数实现如下所示:
// k8s.io/client-go/tools/cache/reflector.go
// relistResourceVersion 决定了反射器应该list或者relist的资源版本。
// 如果最后一次relist的结果是HTTP 410(Gone)状态码,则返回"",这样relist将通过quorum读取etcd中可用的最新资源版本。
// 返回使用 lastSyncResourceVersion,这样反射器就不会使用在relist结果或watch事件中watch到的资源版本更老的资源版本进行relist了
func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone {
// 因为反射器会进行分页List请求,如果 lastSyncResourceVersion 过期了,所有的分页列表请求就都会跳过 watch 缓存
// 所以设置 ResourceVersion="",然后再次 List,重新建立反射器到最新的可用资源版本,从 etcd 中读取,保持一致性。
return ""
}
if r.lastSyncResourceVersion == "" {
// 反射器执行的初始 List 操作的时候使用0作为资源版本。
return "0"
}
return r.lastSyncResourceVersion
}
上面的 ListAndWatch 函数实现看上去虽然非常复杂,但其实大部分是对分页的各种情况进行处理,最核心的还是调用 r.listerWatcher.List(opts)
获取全量的资源对象,而这个 List 其实 ListerWatcher 实现的 List 方法,这个 ListerWatcher 接口实际上在该接口定义的同一个文件中就有一个 ListWatch 结构体实现了:
// k8s.io/client-go/tools/cache/listwatch.go
// ListFunc 知道如何 List 资源
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc 知道如何 watch 资源
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch 结构体知道如何 list 和 watch 资源对象,它实现了 ListerWatcher 接口。
// 它为 NewReflector 使用者提供了方便的函数。其中 ListFunc 和 WatchFunc 不能为 nil。
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking 对 list watcher 请求不分块。
DisableChunking bool
}
// 列出一组 APIServer 资源
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
return lw.ListFunc(options)
}
// Watch 一组 APIServer 资源
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
当我们真正使用一个 Informer 对象的时候,实例化的时候就会调用这里的 ListWatch 来进行初始化,比如前面我们实例中使用的 Deployment Informer,
// k8s.io/client-go/informers/apps/v1/deployment.go
// NewFilteredDeploymentInformer 为 Deployment 构造一个新的 Informer。
// 总是倾向于使用一个 informer 工厂来获取一个 shared informer,而不是获取一个独立的 informer,这样可以减少内存占用和服务器的连接数。
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
},
},
&appsv1.Deployment{},
resyncPeriod,
indexers,
)
}
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
从上面代码我们就可以看出来当我们去调用一个资源对象的 Informer() 的时候,就会去调用上面的 NewFilteredDeploymentInformer
函数进行初始化,而在初始化的使用就传入了 cache.ListWatch
对象,其中就有 List 和 Watch 的实现操作,也就是说前面反射器在 ListAndWatch 里面调用的 ListWatcher 的 List 操作是在一个具体的资源对象的 Informer 中实现的,比如我们这里就是通过的 ClientSet 客户端与 APIServer 交互获取到 Deployment 的资源列表数据的,通过在 ListFunc 中的 client.AppsV1().Deployments(namespace).List(context.TODO(), options)
实现的,这下应该好理解了吧。
获取到了全量的 List 数据过后,通过 listMetaInterface.GetResourceVersion()
来获取资源的版本号,ResourceVersion(资源版本号)非常重要,Kubernetes 中所有的资源都拥有该字段,它标识当前资源对象的版本号,每次修改(CUD)当前资源对象时,Kubernetes API Server 都会更改 ResourceVersion,这样 client-go 执行 Watch 操作时可以根据ResourceVersion 来确定当前资源对象是否发生了变化。
然后通过 meta.ExtractList 函数将资源数据转换成资源对象列表,将 runtime.Object
对象转换成 []runtime.Object
对象,因为全量获取的是一个资源列表。
接下来是通过反射器的 syncWith
函数将资源对象列表中的资源对象和资源版本号存储在 Store 中,这个会在后面的章节中详细说明。