<aside> ⏰
本文主要介绍如何自定义调度器
</aside>
其实要实现一个调度框架的插件,并不难,我们只要实现对应的调度框架扩展点即可,然后将插件注册到调度器中即可,前面章节中我们已经知道了调度器会根据调度框架在一些扩展点上默认启用一些插件,如果我们自己来自定义一个插件的话,如何让调度器识别我们的代码呢?
其实默认的调度器已经为我们预留了这样的入口,在 kube-scheduler
的源文件 cmd/kube-scheduler/app/server.go
中有一个 NewSchedulerCommand
入口函数,其中的参数是一个类型为 Option
的列表,而这个 Option
恰好就是一个用于插件配置的回调函数:
// cmd/kube-scheduler/app/server.go
// Option configures a framework.Registry.
*type* Option *func*(framework.Registry) error
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
*func* NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
......
}
我们可以将自定义的插件通过 Option 传给 NewSchedulerCommand 函数,在调度器初始化的时候会调用 WithFrameworkOutOfTreeRegistry 函数传递给调度器的 frameworkOutOfTreeRegistry 属性,该属性包含了我们自定义的 out-of-tree 插件,最终会和内置的 in-tree 插件进行合并。
// pkg/scheduler/scheduler.go
// 该函数用来设置 out-of-tree 插件,这些插件会被添加到默认的 registry 中去。
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
}
}
所以如果是我们自定义插件的话是不用从头开始去实现的,可以直接使用默认的调度器代码即可,以默认调度器的入口来作为我们的函数入口,并且传入我们自己实现的插件作为参数即可。
在调度器初始化的文件中就提供了一个 WithPlugin
函数可以用来注册插件,最后返回一个 Option
实例,我们可以借助这个函数来给调度器传递自定义插件的参数:
// cmd/kube-scheduler/app/server.go
// WithPlugin 根据插件名称和工厂方法创建一个 Option
// 可以用这个函数来注册 out-of-tree 插件。
func WithPlugin(name string, factory runtime.PluginFactory) Option {
return func(registry runtime.Registry) error {
return registry.Register(name, factory)
}
}
所以最终我们的入口函数如下所示:
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewSchedulerCommand(app.WithPlugin(plugins.Name, plugins.New))
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
其中 app.WithPlugin(sample.Name, sample.New)
就是我们接下来要实现的插件,从 WithPlugin
函数的参数也可以看出我们这里的 sample.New
必须是一个 runtime.PluginFactory
类型的值,而 PluginFactory
的定义就是一个函数:
type PluginFactory = func(configuration runtime.Object, f v1alpha1.FrameworkHandle) (v1alpha1.Plugin, error)
所以 sample.New
实际上就是上面的这个函数,在这个函数中我们可以获取到插件中的一些数据然后进行逻辑处理即可。
另外需要注意的是 Kubernetes 源码里面的 go mod 管理不是很标准,可以直接使用下面我提供的 go.mod 文件:
module github.com/cnych/scheduler-demo
go 1.15
require (
github.com/spf13/pflag v1.0.5
k8s.io/api v0.19.9
k8s.io/apimachinery v0.19.9
k8s.io/component-base v0.19.9
k8s.io/klog/v2 v2.2.0
k8s.io/kube-scheduler v0.19.9 // indirect
k8s.io/kubernetes v1.19.9
)
replace (
k8s.io/api => k8s.io/api v0.19.9
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.19.9
k8s.io/apimachinery => k8s.io/apimachinery v0.19.9
k8s.io/apiserver => k8s.io/apiserver v0.19.9
k8s.io/cli-runtime => k8s.io/cli-runtime v0.19.9
k8s.io/client-go => k8s.io/client-go v0.19.9
k8s.io/cloud-provider => k8s.io/cloud-provider v0.19.9
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.19.9
k8s.io/code-generator => k8s.io/code-generator v0.19.9
k8s.io/component-base => k8s.io/component-base v0.19.9
k8s.io/cri-api => k8s.io/cri-api v0.19.9
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.19.9
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.19.6
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.19.9
k8s.io/kube-proxy => k8s.io/kube-proxy v0.19.9
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.19.9
k8s.io/kubectl => k8s.io/kubectl v0.19.9
k8s.io/kubelet => k8s.io/kubelet v0.19.9
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.19.9
k8s.io/metrics => k8s.io/metrics v0.19.9
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.19.9
)
插件实现如下所示,我们这里只是简单获取下数据打印日志,如果你有实际需求的可以根据获取的数据就行处理即可,我们这里只是实现了 PreFilter
、Filter
扩展点,其他的可以用同样的方式来扩展即可: