<aside> ⏰ 本文主要介绍调度器中的调度插件运行的工作原理。

</aside>

前面我们是在整体上对 Pod 的调度流程进行了分析,但是真正核心的调度操作才是我们最需要关心的,也就是上面提到的 sched.Algorithm.Schedule() 函数的实现。

这里需要关注 Scheduler 下面的 Algorithm 属性,该属性是在初始化调度器的时候传入的:

// pkg/scheduler/scheduler.go
type Scheduler struct {
	Algorithm core.ScheduleAlgorithm
  ......
}

// pkg/scheduler/factory.go

// 初始化 Scheduler
func (c *Configurator) create() (*Scheduler, error) {
	......
	algo := core.NewGenericScheduler(
		c.schedulerCache,
		c.nodeInfoSnapshot,
		extenders,
		c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
		c.disablePreemption,
		c.percentageOfNodesToScore,
	)

	return &Scheduler{
		Algorithm:       algo,
		......
	}, nil
}

// pkg/scheduler/core/generic_scheduler.go

// ScheduleAlgorithm 是一个知道如何将 Pod 调度到节点上去的接口
type ScheduleAlgorithm interface {
	Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
	Extenders() []framework.Extender
}

// NewGenericScheduler 创建一个 genericScheduler 对象
func NewGenericScheduler(
	cache internalcache.Cache,
	nodeInfoSnapshot *internalcache.Snapshot,
	extenders []framework.Extender,
	pvcLister corelisters.PersistentVolumeClaimLister,
	disablePreemption bool,
	percentageOfNodesToScore int32) ScheduleAlgorithm {
	return &genericScheduler{
		cache:                    cache,
		extenders:                extenders,
		nodeInfoSnapshot:         nodeInfoSnapshot,
		pvcLister:                pvcLister,
		disablePreemption:        disablePreemption,
		percentageOfNodesToScore: percentageOfNodesToScore,
	}
}

从定义上来看可以知道 Algorithm 是一个 ScheduleAlgorithm 接口,在初始化的时候我们使用的 core.NewGenericScheduler() 来初始化 Algorithm,证明这个函数返回的 genericScheduler 对象一定会实现 ScheduleAlgorithm 接口,所以我们在 scheduleOne 函数里面真正去调度的时候调用的 sched.Algorithm.Schedule() 函数是 genericScheduler 中的 Schedule() 方法。

Schedule

下面我们来分析下 genericScheduler 中的 Schedule() 函数的实现:

// pkg/scheduler/core/generic_scheduler.go

// Schedule 尝试将指定的 Pod 调度到一系列节点中的一个节点上去。
// 如果调度成功,将返回该节点名称
// 如果调度失败,将返回一个带有失败原因的 FitError 
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
  // 检查最基本的条件
	if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
		return result, err
	}
  
  // 完成调度器缓存和节点信息的快照
	if err := g.snapshot(); err != nil {
		return result, err
	}
 
  // 判断当前快照中的节点数是否为0
	if g.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}
  
  // 预选,先找到一些符合基本条件的节点
	feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
	if err != nil {
		return result, err
	}
  // 没有找到合适的
	if len(feasibleNodes) == 0 {
		return result, &FitError{
			Pod:                   pod,
			NumAllNodes:           g.nodeInfoSnapshot.NumNodes(),
			FilteredNodesStatuses: filteredNodesStatuses,
		}
	}
  
  // 如果预选过后只有1个节点,那么就直接返回这个节点信息就行了
	if len(feasibleNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Name,
			EvaluatedNodes: 1 + len(filteredNodesStatuses),
			FeasibleNodes:  1,
		}, nil
	}
  
  // 如果不止1个节点,那么就需要进行优选,给每个节点进行打分
	priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
	if err != nil {
		return result, err
	}

  // 选择分数最高的作为最终的节点
	host, err := g.selectHost(priorityList)
	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

整个核心调度的实现流程很简单:

  1. 进行一些最基本的调度检查
  2. 对调度器缓存和节点信息快照操作
  3. 首先进行预选操作,找到一批合适的待调度的节点
  4. 如果没有找到节点,返回 FitError 错误
  5. 如果只找到一个节点,则直接返回这个节点的信息
  6. 如果找到多个节点,则进行优选操作,为每个节点进行打分,选择一个分数最高的作为待调度的节点进行返回

这里我们重点关注的是预选优选两个阶段的实现

预选

预选阶段调用 g.findNodesThatFitPod() 函数来获取一批合适的带调度的节点。函数实现如下所示:

// pkg/scheduler/core/generic_scheduler.go

// 根据框架的过滤插件和过滤扩展器对节点进行过滤,找到适合 Pod 的节点
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
	filteredNodesStatuses := make(framework.NodeToStatusMap)

	// 运行 "prefilter" 插件
	s := prof.RunPreFilterPlugins(ctx, state, pod)
	if !s.IsSuccess() {
		if !s.IsUnschedulable() {
			return nil, nil, s.AsError()
		}
		// 更新节点的状态
		allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
		if err != nil {
			return nil, nil, err
		}
		for _, n := range allNodes {
			filteredNodesStatuses[n.Node().Name] = s
		}
		return nil, filteredNodesStatuses, nil
	}
  // 通过 Filter 插件找到合适的节点
	feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
	if err != nil {
		return nil, nil, err
	}
  // 通过 Extenders 过滤合适的节点
	feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
	if err != nil {
		return nil, nil, err
	}
	return feasibleNodes, filteredNodesStatuses, nil
}