所有文章

k8s源码 - scheduler

本文以kubenetes v7.2为例,说明kube-scheduler组件的启动流程与工作原理

入口

scheduler的main函数定义在plugin/cmd/kube-scheduler/scheduler.go中,main函数代码也比较清晰: 1. 创建默认配置对象 1. 将配置对象的指针传给命令行解析器,然后命令行解析器把解析到的各选项的值写入到配置对象中 1. 如果用户指定了version选项则打印版本信息并退出 1. 将配置对象传给Run()函数,然后就开始启动Scheduler了

创建客户端

kubeClient, leaderElectionClient, err := createClients(s)

Run()函数一开始先创建了一个kubernetes的客户端,用来连接kube-apiserver组件以获取集群信息,这个客户端对象会被包含在Scheduler对象中。

创建缓存更新器

informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
podInformer := factory.NewPodInformer(kubeClient, 0)

然后又根据客户端创建出来两个Informer对象,它跟客户端在一个包中,其作用是允许用户提供一些事件监听器(watcher),然后它有一个Run方法,启动以后会一直循环从kube-apiserver中查询我们想要的信息,比如节点状态、新增Pod等等,如果有变化就会触发我们注册的相应的监听器对应的动作,然后本地有一个缓存对象,用来存放这些查询到的信息,这时只是创建,它们的Run方法还没有被调用。

创建Scheduler

sched, err := CreateScheduler(

Scheduler对象的创建与另外两个对象密切相关,一个是Config,与Scheduler定义在一个文件中plugin/pkg/scheduler/scheduler.go,另一个是ConfigFactory,定义在plugin/pkg/scheduler/factory/factory.go.getNextPod(),它们的关系大概为: 1. ConfigFactory包含了最底层的操作函数,而且功能很丰富,由于它代码较多就不贴出来了,它的主要工作是维护一些列表,比如等待调度的Pod、已调度的Pod、集群节点列表、PV/PVC列表等,并不断更新,当然还包括向队列增删改查的函数,这些函数由Informer提供。 1. Scheduler是对ConfigFactory的高级抽象,相对包含的函数少一些,因为它封装出了更高级的功能,使用起来更简单。 1. 而Config是Scheduler中的一个字段,Config没有函数只有一些字段,主要的作用是包含了Scheduler运行时需要的资源,这个Config对象包含的元素如下:

type Config struct {
	SchedulerCache schedulercache.Cache
	Ecache     *core.EquivalenceCache
	NodeLister algorithm.NodeLister
	Algorithm  algorithm.ScheduleAlgorithm
	Binder     Binder
	PodConditionUpdater PodConditionUpdater
	PodPreemptor PodPreemptor
	NextPod func() *v1.Pod
	WaitForCacheSync func() bool
	Error func(*v1.Pod, error)
	Recorder record.EventRecorder
	StopEverything chan struct{}
}

其中NodeLister是集群中所有节点的列表,Binder用来将指定Pod绑定到某一主机上。

Algorithm自然就是调度算法了, 默认的算法定义在plugin/pkg/scheduler/algorithm/priorities/包内,而ConfigFactory通过调用plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go.defaultPriorities()函数将这些默认的算法封装起来并最终注册到Config对象中。

NextPod用来获取下一个等待调度的Pod,它其实调用的是ConfigFactory中的getNextPod()函数。

调度器中几个重要的对象的创建大概就这么多,后面就开始启动了。

启动Metrics服务

go startHTTP(s)

Metrics是一个用于查询调试信息和运行状态信息的REST API Server,它定义如下:

func startHTTP(s *options.SchedulerServer) {
	mux := http.NewServeMux()
	healthz.InstallHandler(mux)
	if s.EnableProfiling {
		mux.HandleFunc("/debug/pprof/", pprof.Index)
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
...
mux.Handle("/metrics", prometheus.Handler())
...

服务的默认端口是10251,所以我们可以这样查看Scheduler的运行时信息:

curl -i localhost:10251/metrics

启动缓存更新器

go podInformer.Informer().Run(stop)
informerFactory.Start(stop)

启动Scheduler

sched.Run()

终于看到启动Schduler的代码了,看看它是怎么启动的: plugin/pkg/scheduler/scheduler.go

func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

wait.Until()这个函数的作用就是循环调用,第一个参数是要调用的函数,第二个是间隔时间,第三个是一个chan,如果该通道被关闭则停止循环。

调度Pod

其中sched.scheduleOne()函数中的代码也比较多,我们就直说它的功能:

  1. 执行sched.config.NextPod()获取下一个需要被调度的Pod,sched.config就是我们前面说到的Config对象,这时候就它发挥作用的时候了。
  2. 通过启动时指定的调度算法得出该Pod应该被调度到哪节点上,关键代码是host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
  3. 绑定Pod和主机 err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: suggestedHost, }, })
  4. 将绑定信息写回到kube-apiserver,这一步是在上面的sched.bind()这个函数上完成的,关键代码是err := sched.config.Binder.Bind(b)Bind()是一个接口函数,它的实现在ConfigFactory对象中,定义如下: // Bind just does a POST binding RPC. func (b *binder) Bind(binding *v1.Binding) error { glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) } 最后将本次的操作信息写入到Metrics服务中。

总结

从调度器的实现上来看,它与apiserver之间的解耦合做的非常彻底,完全没有依赖,我们甚至可以通过REST API手动来完成Pod与主机的绑定。文中提到客户端是一个单独的项目在Github上,你可以把它引用在自己的项目中来完成与k8s集群的交互,当然可以模仿调度器用Informer的方式实现更高级的功能。

以上就是Scheduler组件启动的全总流程,希望你读完本文后对Scheduler有一个比较清晰的认识,以更好的使用你的k8s集群。


编写日期:2018-01-14