<aside> ⏰

本文主要介绍如何自定义调度器

</aside>

插件注册

其实要实现一个调度框架的插件,并不难,我们只要实现对应的调度框架扩展点即可,然后将插件注册到调度器中即可,前面章节中我们已经知道了调度器会根据调度框架在一些扩展点上默认启用一些插件,如果我们自己来自定义一个插件的话,如何让调度器识别我们的代码呢?

其实默认的调度器已经为我们预留了这样的入口,在 kube-scheduler 的源文件 cmd/kube-scheduler/app/server.go 中有一个 NewSchedulerCommand 入口函数,其中的参数是一个类型为 Option 的列表,而这个 Option 恰好就是一个用于插件配置的回调函数:

// cmd/kube-scheduler/app/server.go

// Option configures a framework.Registry.
*type* Option *func*(framework.Registry) error

// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
*func* NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
  opts, err := options.NewOptions()
  ......
}

我们可以将自定义的插件通过 Option 传给 NewSchedulerCommand 函数,在调度器初始化的时候会调用 WithFrameworkOutOfTreeRegistry 函数传递给调度器的 frameworkOutOfTreeRegistry 属性,该属性包含了我们自定义的 out-of-tree 插件,最终会和内置的 in-tree 插件进行合并。

// pkg/scheduler/scheduler.go

// 该函数用来设置 out-of-tree 插件,这些插件会被添加到默认的 registry 中去。
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
	return func(o *schedulerOptions) {
		o.frameworkOutOfTreeRegistry = registry
	}
}

所以如果是我们自定义插件的话是不用从头开始去实现的,可以直接使用默认的调度器代码即可,以默认调度器的入口来作为我们的函数入口,并且传入我们自己实现的插件作为参数即可。

在调度器初始化的文件中就提供了一个 WithPlugin 函数可以用来注册插件,最后返回一个 Option 实例,我们可以借助这个函数来给调度器传递自定义插件的参数:

// cmd/kube-scheduler/app/server.go

// WithPlugin 根据插件名称和工厂方法创建一个 Option
// 可以用这个函数来注册 out-of-tree 插件。
func WithPlugin(name string, factory runtime.PluginFactory) Option {
	return func(registry runtime.Registry) error {
		return registry.Register(name, factory)
	}
}

所以最终我们的入口函数如下所示:

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewSchedulerCommand(app.WithPlugin(plugins.Name, plugins.New))

	// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
	// normalize func and add the go flag set by hand.
	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
	// utilflag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

其中 app.WithPlugin(sample.Name, sample.New) 就是我们接下来要实现的插件,从 WithPlugin 函数的参数也可以看出我们这里的 sample.New 必须是一个 runtime.PluginFactory 类型的值,而 PluginFactory 的定义就是一个函数:

type PluginFactory = func(configuration runtime.Object, f v1alpha1.FrameworkHandle) (v1alpha1.Plugin, error)

所以 sample.New 实际上就是上面的这个函数,在这个函数中我们可以获取到插件中的一些数据然后进行逻辑处理即可。

另外需要注意的是 Kubernetes 源码里面的 go mod 管理不是很标准,可以直接使用下面我提供的 go.mod 文件:

module github.com/cnych/scheduler-demo

go 1.15

require (
	github.com/spf13/pflag v1.0.5
	k8s.io/api v0.19.9
	k8s.io/apimachinery v0.19.9
	k8s.io/component-base v0.19.9
  k8s.io/klog/v2 v2.2.0
	k8s.io/kube-scheduler v0.19.9 // indirect
	k8s.io/kubernetes v1.19.9
)

replace (
	k8s.io/api => k8s.io/api v0.19.9
	k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.19.9
	k8s.io/apimachinery => k8s.io/apimachinery v0.19.9
	k8s.io/apiserver => k8s.io/apiserver v0.19.9
	k8s.io/cli-runtime => k8s.io/cli-runtime v0.19.9
	k8s.io/client-go => k8s.io/client-go v0.19.9
	k8s.io/cloud-provider => k8s.io/cloud-provider v0.19.9
	k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.19.9
	k8s.io/code-generator => k8s.io/code-generator v0.19.9
	k8s.io/component-base => k8s.io/component-base v0.19.9
	k8s.io/cri-api => k8s.io/cri-api v0.19.9
	k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.19.9
	k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.19.6
	k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.19.9
	k8s.io/kube-proxy => k8s.io/kube-proxy v0.19.9
	k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.19.9
	k8s.io/kubectl => k8s.io/kubectl v0.19.9
	k8s.io/kubelet => k8s.io/kubelet v0.19.9
	k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.19.9
	k8s.io/metrics => k8s.io/metrics v0.19.9
	k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.19.9
)

插件实现

插件实现如下所示,我们这里只是简单获取下数据打印日志,如果你有实际需求的可以根据获取的数据就行处理即可,我们这里只是实现了 PreFilterFilter 扩展点,其他的可以用同样的方式来扩展即可: