所有文章

k8s源码 - apiserver

本系列文章是基于kubernetes1.7版本的。

main函数

apiserver的入口定义在cmd/kube-apiserver/apiserver.go文件中:

func main() {
	rand.Seed(time.Now().UTC().UnixNano())

	// 创建一个默认配置对象
	s := options.NewServerRunOptions()
	
	// 构建命令行对象
	s.AddFlags(pflag.CommandLine)

    // 解析命令行参数
	flag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	// 如果指定了-version选项,则打印版本号然后退出
	verflag.PrintAndExitIfRequested()

	// 启动服务,并从管道中监听停止信号,该通道可能永远不会写入数据
	if err := app.Run(s, wait.NeverStop); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

这个main函数看起来是比较直观的,中文的注释是我加上去的,便于阅读。

初始化配置

在main函数中的第一行是为随机数产生器用当前时间提供了一个基数,避免多次启动产生相同的值,说明在这个组件中可能有些功能依赖了随数机。

s := options.NewServerRunOptions()显然是在创建一个配置对象,不过这个对象中只包含一些默认参数。

s.AddFlags(pflag.CommandLine)是构建一个命令行对象,怎么构建的呢,其实就是spflag.CommandLine这个对象中加了很多选项,并把自己的在很多字段以指针的方式注入到了pflag.CommandLine中,这些选项是经过分组的,如ETCD相关配置、apiserver相关的请求超时时间等。

紧接着flag.InitFlags()解析所有命令行参数,并通过指针把值放到配置对象s中。

其中flag这个包是k8s对开源项目github.com/spf13/pflag的封装,而这个项目又是基于golang标准库中的flag包开发的,并且封装了一些实用的函数,可以让你快速生成自己的命令行选项。

准备资源

然后调用到了Run()函数,定义在cmd/kube-apiserver/app/server.go文件中,这里代码较多就不全部贴出来了,为了保持文章的可读性,我会尽量减少函数展开的层数。

nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)

上面代码判断是否安装在云主机上,如果是,并且指定了密钥文件,也是就--ssh-keyfile选项,则安装key到所有云主机的实例中,然后使用该密钥文件创建一个连接器nodeTunneler,用来访问其它节点。

kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)

这句是利用最初的配置对象runOptions创建用于启动apiserver的配置对象,与runOptions不同的是,它还包括了启动apiserver所需的资源。

该函数中做了以下几件事: 1. 根据用户指定的--admission-control列表加载相应的admission插件,这些插件的作用是用来过滤API请求的,可以用来作请求的合法性检测,比如我们对所有get csr的请求进行用户验证,如果该用户权限太低则拒绝该请求;也可以修改某个请求,比如为所有create deployments的请求加上scale=1等等。默认不加载任何规则。 1. 如果有必要的配置没有设置则为其设置默认值,如Service IP段、序列化缓存大小等。 1. 验证必要配置项,如果在此阶段有错误配置项,则所有错误信息将被打包返回。 1. 如果验证通过则开始连接ETCD,并启动同步信息的进程,该进程会一直监听apiserver的数据并同步到ETCD集群中。 1. 最后创建master.Config对象,也就是kubeAPIServerConfig变量,然后返回。

kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter)

这行代码是比较核心的,它的作用如下: 1. 首先对kubeAPIServerConfig做一个配置信息的检查,如果有空缺的字段则为其补全。 1. 安装以/api/v1开头的的REST API,安装API的函数是pkg/master/master.go文件内的InstallLegacyAPI()函数,而把RUI和Handler对应起来的是pkg/registry/core/rest/storage_core.go文件内的NewLegacyRESTStorage()函数,我们贴出来一小段:

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
...
		"replicationControllers":        controllerStorage.Controller,
		"replicationControllers/status": controllerStorage.Status,

		"services":        serviceRest.Service,
		"services/proxy":  serviceRest.Proxy,
		"services/status": serviceStatusStorage,
...

可见在k8s中,把Handler封装成了Storage,其实就是一个Storage对象负责一个REST API的增删改查。 3. 然后把最终的启动逻辑作为钩子,注册到kubeAPIServer中,但这时还没有启动,注册钩子的代码如下:

kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
		sharedInformers.Start(context.StopCh)
		return nil
	})

我们来说一下这个钩子函数,可以看到这里做了一个启动操作,这个sharedInformers是个集合,它包含了多个SharedInformer对象,也说是说这个启动操作其实是启动所有的SharedInformer,这个SharedInformer又是什么呢?它其实是一个缓存器,其它如kube-scheduler、kubelet等组件跟apiserver通信时都是通过缓存间接通信的,而这个SharedInformer一方面负责收集客户端和其它组件发来的请求,一方面负责通知该事件的关注者,使关注者可以做出相应的动作,还有就是它会定其把缓存中的数据同步到ETCD中。

以下是SharedInformer接口的定义,在shared_informer.go文件中:

type SharedInformer interface {
	AddEventHandler(handler ResourceEventHandler)
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	GetStore() Store
	GetController() Controller
	Run(stopCh <-chan struct{})
	HasSynced() bool
	LastSyncResourceVersion() string
}

前两个函数都是向该事件添加一个监听器,也就是前面说的关注者,只不过使用第二个函数可以自定义同步数据的周期间隔时间。 GetStore()用来获取该事件类型的处理器,有新事件时该Store对应的逻辑会被调起。 Run()就是启动该SharedInformer的各个进程了。

启动服务

此时我们还在app.Run()函数中,在函数的最后才是真正启动Server的逻辑:

aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)

我们展开句尾的.Run(stopCh)看一下,这正是API服务启动的关键代码,它定义在: kubernetes/vendor/k8s.io/apiserver/pkg/server/serve.go

func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
	secureServer := &http.Server{
		Addr:           s.SecureServingInfo.BindAddress,
		Handler:        s.Handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig: &tls.Config{
			NameToCertificate: s.SecureServingInfo.SNICerts,
			MinVersion: tls.VersionTLS12,
			NextProtos: []string{"h2", "http/1.1"},
		},
	}

    if s.SecureServingInfo.Cert != nil {
		secureServer.TLSConfig.Certificates = []tls.Certificate{*s.SecureServingInfo.Cert}
	}
	
    // 中间省略...
	
	s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
}
  1. 函数的开头就是定义一个Server,这在golang中是一个标准的Server.
  2. 其中的s.Handler是所有REST API对应的handler,golang中的handler相当于java中的servlet,或者是Spring VMC中的controller.
  3. 读取配置中提供的证书、CA、私钥等,注入到Server对象中。
  4. 最后一行是启动Server,里面是非阻塞式启动的,逻辑较简单,这里就不再展开了。
  5. 调用刚才注册的启动钩子函数,启动各个SharedInformer
  6. 监听stopCh通道并阻塞主进程。

到这里就是apiserver启动的全部流程了,下次细讲一下apiserver的工作流程,以及它如何处理Client发来的创建、更新、删除等请求。


编写日期:2017-12-23