<aside> ⏰ 本文主要介绍调度器中的调度插件运行的工作原理。
</aside>
前面我们是在整体上对 Pod 的调度流程进行了分析,但是真正核心的调度操作才是我们最需要关心的,也就是上面提到的 sched.Algorithm.Schedule()
函数的实现。
这里需要关注 Scheduler 下面的 Algorithm 属性,该属性是在初始化调度器的时候传入的:
// pkg/scheduler/scheduler.go
type Scheduler struct {
Algorithm core.ScheduleAlgorithm
......
}
// pkg/scheduler/factory.go
// 初始化 Scheduler
func (c *Configurator) create() (*Scheduler, error) {
......
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
c.disablePreemption,
c.percentageOfNodesToScore,
)
return &Scheduler{
Algorithm: algo,
......
}, nil
}
// pkg/scheduler/core/generic_scheduler.go
// ScheduleAlgorithm 是一个知道如何将 Pod 调度到节点上去的接口
type ScheduleAlgorithm interface {
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
Extenders() []framework.Extender
}
// NewGenericScheduler 创建一个 genericScheduler 对象
func NewGenericScheduler(
cache internalcache.Cache,
nodeInfoSnapshot *internalcache.Snapshot,
extenders []framework.Extender,
pvcLister corelisters.PersistentVolumeClaimLister,
disablePreemption bool,
percentageOfNodesToScore int32) ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
extenders: extenders,
nodeInfoSnapshot: nodeInfoSnapshot,
pvcLister: pvcLister,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}
从定义上来看可以知道 Algorithm 是一个 ScheduleAlgorithm
接口,在初始化的时候我们使用的 core.NewGenericScheduler()
来初始化 Algorithm,证明这个函数返回的 genericScheduler 对象一定会实现 ScheduleAlgorithm
接口,所以我们在 scheduleOne 函数里面真正去调度的时候调用的 sched.Algorithm.Schedule()
函数是 genericScheduler
中的 Schedule() 方法。
下面我们来分析下 genericScheduler
中的 Schedule() 函数的实现:
// pkg/scheduler/core/generic_scheduler.go
// Schedule 尝试将指定的 Pod 调度到一系列节点中的一个节点上去。
// 如果调度成功,将返回该节点名称
// 如果调度失败,将返回一个带有失败原因的 FitError
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 检查最基本的条件
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
// 完成调度器缓存和节点信息的快照
if err := g.snapshot(); err != nil {
return result, err
}
// 判断当前快照中的节点数是否为0
if g.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
// 预选,先找到一些符合基本条件的节点
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
if err != nil {
return result, err
}
// 没有找到合适的
if len(feasibleNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
FilteredNodesStatuses: filteredNodesStatuses,
}
}
// 如果预选过后只有1个节点,那么就直接返回这个节点信息就行了
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
// 如果不止1个节点,那么就需要进行优选,给每个节点进行打分
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
if err != nil {
return result, err
}
// 选择分数最高的作为最终的节点
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}
整个核心调度的实现流程很简单:
这里我们重点关注的是预选和优选两个阶段的实现
预选阶段调用 g.findNodesThatFitPod()
函数来获取一批合适的带调度的节点。函数实现如下所示:
// pkg/scheduler/core/generic_scheduler.go
// 根据框架的过滤插件和过滤扩展器对节点进行过滤,找到适合 Pod 的节点
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
// 运行 "prefilter" 插件
s := prof.RunPreFilterPlugins(ctx, state, pod)
if !s.IsSuccess() {
if !s.IsUnschedulable() {
return nil, nil, s.AsError()
}
// 更新节点的状态
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, err
}
for _, n := range allNodes {
filteredNodesStatuses[n.Node().Name] = s
}
return nil, filteredNodesStatuses, nil
}
// 通过 Filter 插件找到合适的节点
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
// 通过 Extenders 过滤合适的节点
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
return feasibleNodes, filteredNodesStatuses, nil
}