所有文章

k8s源码分析-kubelet

本文以k8s 1.7.2为例,分析kubelet组件的启动流程。

入口

main函数定义在cmd/kubelet/kubelet.go,主要是收集配置信息:

  1. 将命令行传进来的参数解析到配置对象中。
  2. 创建kubelet服务对象,封装了配置对象。
  3. 根据配置创建相应的容器运行时客户端,如docker、rkt。
  4. 加载插件,如vlume插件、network插件等。
  5. 创建kubelet依赖对象。
  6. 调用aap.Run()准备启动kubelet。

其中创建docker客户端是这一句:

kubeletDeps, err := app.UnsecuredDependencies(kubeletServer)

我重点讲一下它是怎样创建docker客户端的。

创建docker客户端

我们知道k8s支持多种容器运行时(runtime),那我们怎样指定我们想要的运行时呢?就是kubelet的--container-runtime选项,目前只支持两种值:”docker”和”rkt”,默认为”docker”,以下就是根据我们指定的值创建运行时客户端:

cmd/kubelet/app/server.go:131

	var dockerClient libdocker.Interface
	// kubetypes.DockerContainerRuntime是常量,值为"docker"
	if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
		dockerClient = libdocker.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration,
			s.ImagePullProgressDeadline.Duration)
	} else {
		dockerClient = nil
	}

这里调用了一个关键函数ConnectToDockerOrDie(),定义如下: pkg/kubelet/dockershim/libdocker/client.go

func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout, imagePullProgressDeadline time.Duration) Interface {
	if dockerEndpoint == "fake://" {
		return NewFakeDockerClient()
	}
	client, err := getDockerClient(dockerEndpoint)
	if err != nil {
		glog.Fatalf("Couldn't connect to docker: %v", err)
	}
	glog.Infof("Start docker client with request timeout=%v", requestTimeout)
	return newKubeDockerClient(client, requestTimeout, imagePullProgressDeadline)
}

其实就是根据--container-runtime-endpoint选项的值来创建客户端,默认为”unix:///var/run/docker.sock”。 然后这个客户端对象被封装在kubelet.Dependencies对象中,后面会在启动kubelet时用到。

准备启动kubelet

此时进入cmd/kubelet/app/server.go:run()函数,该函数工作如下:

  1. 检查配置项。
  2. 创建kubeClietn用于与apiserver通信。
  3. 创建本地镜像文件查询器。
  4. 创建事件查询器,其实也是一个apiserver客户端,用来获取Pod状态等事件。
  5. 检查用户权限,如果当前用户不是root则报错退出。

构建kubelet并启动:

cmd/kubelet/app/server.go

func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
	k, err := builder(kubeCfg, kubeDeps, &kubeFlags.ContainerRuntimeOptions, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.ProviderID, kubeFlags.CloudProvider, kubeFlags.CertDirectory, kubeFlags.RootDirectory)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}
...

最后启动kubelet,实际上也是一个http服务,监听配置中指定的端口,并阻塞。

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies) {
...
	// start the kubelet server
	if kubeCfg.EnableServer {
		go wait.Until(func() {
			k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
		}, 0, wait.NeverStop)
	}
...

容器运行时接口

前面说过k8s目前默认提供两种运行时,但k8s抽象出来一组标准的接口,我们只需提供相应的实现就可以添加我们想要的引擎。先看看原有的实现。

运行时的公共接口定义在pkg/kubelet/dockershim/libdocker/client.go文件中,代码较多,不贴了。

而Docker相应的实现如下 vendor/github.com/docker/docker/client/client.go

type Client struct {
	scheme string
	host string
	proto string
	addr string
	basePath string
	client *http.Client
	version string
	customHTTPHeaders map[string]string
	manualOverride bool
}

这里只列出它包含的字段,还有很多诸如创建容器、删除容器、更新容器的之类的函数。前面说的创建Docker客户端,其实就是这个结构体的实例。

与实际的容器引擎交互

对容器增删改查的具体逻辑都在vendor/github.com/docker/docker/client/包内。

我们看看它是怎么做到的,以创建容器为例: vendor/github.com/docker/docker/client/container_create.go

func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
...
	query := url.Values{}
	if containerName != "" {
		query.Set("name", containerName)
	}

	body := configWrapper{
		Config:           config,
		HostConfig:       hostConfig,
		NetworkingConfig: networkingConfig,
	}

	serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
...

这下明白了吧,我们对某个Pod的操作都会被组合成一个rest请求,然后发送给docker daemon,返回结果。


编写日期:2018-03-06