<aside> ⏰ 本文主要介绍调度器中的调度插件 NodeResourcesFit 的工作原理。

</aside>

prefilter 插件的主要作用进行一些预置的检查和为后面的扩展点提前准备数据,后续插件需要的状态数据都是通过 CycleState 来进行存储和检索的,一个插件存储的状态数据可以被另一个插件读取、修改或删除。

PreFilter

比如这里我们选择 NodeResourcesFit 这个插件来进行说明,该插件的核心方法就是实现 PreFilter 函数:

// pkg/scheduler/framework/plugins/noderesources/fit.go

const (
	// 定义的插件名称
	FitName = "NodeResourcesFit"

	// preFilterStateKey 是存放在 CycleState 中的关于 NodeResourcesFit 预计算数据的 key
	preFilterStateKey = "PreFilter" + FitName
)

// computePodResourceRequest 返回一个涵盖每个资源维度中最大宽度的 framework.Resource。
// 因为 initContainers 是按照顺序运行的,所以我们循环收集每个维度中的最大值;
// 相反,由于普通容器是同时运行的,所以我们对它们的资源向量是进行求和计算。

// 此外如果启用了 PodOverhead 这个特性并且指定了 Pod Overhead,
// 则也需要为 Overhead 定义的资源将被添加到计算的 Resource 请求总和上。
//
// 示例:
//
// Pod:
//   InitContainers  初始化容器
//     IC1:
//       CPU: 2
//       Memory: 1G
//     IC2:
//       CPU: 2
//       Memory: 3G
//   Containers  普通容器
//     C1:
//       CPU: 2
//       Memory: 1G
//     C2:
//       CPU: 1
//       Memory: 1G
//
// Result: CPU: 3, Memory: 3G
// 初始化容器:IC1和IC2是顺序执行,所以获取两个中最大的资源,即:CPU:2,Memory:3G
// 普通容器:C1和C2是同时运行的,所以需要的资源是两者之和:CPU:2+1=3,Memory:1+1=2G
// 最后需要的资源请求是初始化容器和普通容器中最大的:CPU:3,Memory:3G
func computePodResourceRequest(pod *v1.Pod) *preFilterState {
	result := &preFilterState{}
  // 普通容器Requests资源相加
	for _, container := range pod.Spec.Containers {
		result.Add(container.Resources.Requests)
	}

	// take max_resource(sum_pod, any_init_container)
	for _, container := range pod.Spec.InitContainers {
		result.SetMaxResource(container.Resources.Requests)
	}

	// 如果正在使用 Overhead 特性,则也需要计算到总和里面
	if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
		result.Add(pod.Spec.Overhead)
	}

	return result
}

// 在 prefilter 扩展点被调用
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
	// 计算Pod请求所需的资源,然后存储到 CycleState 中,方便后续其他插件获取数据
  cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
	return nil
}

也就是在 prefilter 这个扩展点的时候会获取到当前我们要调度的 Pod 需要的 Requests 资源,然后将其存入 CycleState。然后其他插件中如果需要用到这个数据就可以直接获取了,简单来说 CycleState 就是用于调度周期上下文数据传递共享的。

prefilter 扩展点注册的插件执行完成后,接着就是执行 filter 扩展点的插件了,同样默认启用的插件通过 getDefaultConfig() 函数进行了配置:

// pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
	return &schedulerapi.Plugins{
		......
		Filter: &schedulerapi.PluginSet{
			Enabled: []schedulerapi.Plugin{
				{Name: nodeunschedulable.Name},
				{Name: noderesources.FitName},
				{Name: nodename.Name},
				{Name: nodeports.Name},
				{Name: nodeaffinity.Name},
				{Name: volumerestrictions.Name},
				{Name: tainttoleration.Name},
				{Name: nodevolumelimits.EBSName},
				{Name: nodevolumelimits.GCEPDName},
				{Name: nodevolumelimits.CSIName},
				{Name: nodevolumelimits.AzureDiskName},
				{Name: volumebinding.Name},
				{Name: volumezone.Name},
				{Name: podtopologyspread.Name},
				{Name: interpodaffinity.Name},
			},
		},
		......
  }
}

Filter

由于插件较多,这里我们也暂时挑选一个进行简单说明,例如我们可以看到在 Filter 中也注册了一个 noderesources.FitName 的插件,这其实就是上面的 prefilter 阶段使用过的 NodeResourcesFit 插件,这其实也说明了某些插件是可能在任何一个扩展点出现了,现在是在 filter 扩展点,那么我们重点要看的就是该插件的 Filter() 函数的实现:

// pkg/scheduler/framework/plugins/noderesources/fit.go

// 在 filter 扩展点调用。

// 检查一个节点是否有足够的资源,如cpu、内存、gpu 等来运行一个 Pod。
// 它返回一个资源不足的列表,如果为空,则说明该节点拥有 Pod 请求的所有资源。
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
	// 获取 prefilter 阶段存储在 CycleState 中的数据
  s, err := getPreFilterState(cycleState)
	if err != nil {
		return framework.NewStatus(framework.Error, err.Error())
	}

	insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources, f.ignoredResourceGroups)
  // 不足的资源大小不为0
	if len(insufficientResources) != 0 {
		// 保留所有的失败原因
		failureReasons := make([]string, 0, len(insufficientResources))
		for _, r := range insufficientResources {
			failureReasons = append(failureReasons, r.Reason)
		}
    // 直接返回调度失败
		return framework.NewStatus(framework.Unschedulable, failureReasons...)
	}
	return nil
}

func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {
	insufficientResources := make([]InsufficientResource, 0, 4)
  // 当前节点允许的 Pod 数量,默认110
	allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
  // 如果现有的 Pod 数+1(当前调度的Pod) > 节点允许的Pod数
  // 则提示太多Pods
	if len(nodeInfo.Pods)+1 > allowedPodNumber {
		insufficientResources = append(insufficientResources, InsufficientResource{
			v1.ResourcePods,
			"Too many pods",
			1,
			int64(len(nodeInfo.Pods)),
			int64(allowedPodNumber),
		})
	}
  // 没有配置Requests资源,则直接返回
	if podRequest.MilliCPU == 0 &&
		podRequest.Memory == 0 &&
		podRequest.EphemeralStorage == 0 &&
		len(podRequest.ScalarResources) == 0 {
		return insufficientResources
	}
  // 节点可分配的CPU不够
	if nodeInfo.Allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.Requested.MilliCPU {
		insufficientResources = append(insufficientResources, InsufficientResource{
			v1.ResourceCPU,
			"Insufficient cpu",
			podRequest.MilliCPU,
			nodeInfo.Requested.MilliCPU,
			nodeInfo.Allocatable.MilliCPU,
		})
	}
  // 可分配的内存不够
	if nodeInfo.Allocatable.Memory < podRequest.Memory+nodeInfo.Requested.Memory {
		insufficientResources = append(insufficientResources, InsufficientResource{
			v1.ResourceMemory,
			"Insufficient memory",
			podRequest.Memory,
			nodeInfo.Requested.Memory,
			nodeInfo.Allocatable.Memory,
		})
	}
  // 临时存储不够
	if nodeInfo.Allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.Requested.EphemeralStorage {
		insufficientResources = append(insufficientResources, InsufficientResource{
			v1.ResourceEphemeralStorage,
			"Insufficient ephemeral-storage",
			podRequest.EphemeralStorage,
			nodeInfo.Requested.EphemeralStorage,
			nodeInfo.Allocatable.EphemeralStorage,
		})
	}
  // 查看其他标量资源
	for rName, rQuant := range podRequest.ScalarResources {
    // 如果这个资源是应该被忽略的一种扩展扩展资源,则跳过检查
		if v1helper.IsExtendedResourceName(rName) {
      var rNamePrefix string
			if ignoredResourceGroups.Len() > 0 {
				rNamePrefix = strings.Split(string(rName), "/")[0]
			}
			if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
				continue
			}
		}
    // 对应资源在节点上不足
		if nodeInfo.Allocatable.ScalarResources[rName] < rQuant+nodeInfo.Requested.ScalarResources[rName] {
			insufficientResources = append(insufficientResources, InsufficientResource{
				rName,
				fmt.Sprintf("Insufficient %v", rName),
				podRequest.ScalarResources[rName],
				nodeInfo.Requested.ScalarResources[rName],
				nodeInfo.Allocatable.ScalarResources[rName],
			})
		}
	}
  // 返回所有的不足资源信息
	return insufficientResources
}

上面的过滤函数整体比较简单易懂,拿到 prefilter 阶段存储在 CycleState 里面的 Pod 请求资源数据,然后和节点上剩余的可分配资源进行比较,如果没有设置 Requests 资源则直接返回,但是也会检查当前节点是否还有 Pod 数量(默认110),然后就是比较 CPU、内存、临时存储、标量资源等是否还有可分配的,所谓标量资源就是我们在定义 Pod 的时候可以自己指定一种资源来进行分配,比如 GPU,我们就可以当成一种标量资源进行分配,同样也需要判断节点上是否有可分配的标量资源。