所有文章

k8s源码 - 数据流程

本文从源码层面解释kubernetes从接收创建Pod的指令到实际创建Pod的整个过程。

1.1 接收请求

监听的任务是由kube-apiserver这个组件来完成的,它实际上是一个WEB服务,一般是以双向TLS认证方式启动的,所以在启动时需要提供证书、私钥、客户端的CA证书和CA私钥,当然也支持HTTP的方式,启动后就开始监听用户请求了。

1.2 数据存储

kube-apiserver中的WEB服务在启动时注册了很多的Handler,golang中的Handler相当于Java中的servlet或者是Spring中的Controller,是对某一业务逻辑的封装,通俗点说,一个Handler负责对一个URI请求的处理,而在kube-apiserver中,把Handler封装成了一个叫Store的东西,怎么封装的呢?比如/api/v1/namespaces/{namespace}/pods这个URI对应了一个叫PodStorage的Store,这个Store中包含了对/api/v1/namespaces/{namespace}/pods的多个Handler,这些Handler有的是处理创建请求,有的是处理删除请求等等,代表了对一种资源的操作集。

1.2.1 Store对象

我们来看看这个Store的定义: staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

type Store struct {
	Copier runtime.ObjectCopier
	// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
	NewFunc func() runtime.Object
...
	CreateStrategy rest.RESTCreateStrategy
	AfterCreate ObjectFunc
	UpdateStrategy rest.RESTUpdateStrategy
	AfterUpdate ObjectFunc
	DeleteStrategy rest.RESTDeleteStrategy
	AfterDelete ObjectFunc
...
	// Storage is the interface for the underlying storage for the resource.
	Storage storage.Interface
	// Called to cleanup clients used by the underlying Storage; optional.
	DestroyFunc func() entries. default value.
	WatchCacheSize *int
}
...
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
...
func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
...

因为每种资源所需要的操作不一样,所以Store中只包含了基本的通用的操作,作为一个基础类。

在Store的定义中有一个Storage storage.Interface字段,Store中的创建、更新、删除等操作都是调用的这个对象中的方法,也就是说这个Storage对象包含了一组更低级的操作,可以看作是数据的持久化层,这些操作都是通用的,而Store可以用Storage中的功能组合出具有不同功能的控制层对象,也就是Store对象啊,,好吧我们距离真相又进了一步,那这个storage.Interface又是怎样实现的呢?

1.2.2 Interface的实现

Storage是一个名为Interface的接口,里面定义了一些数据持久层的操作,这里就不贴出来了,我们更关心它的实现,我们先来看看Interface实例的创建吧,它的创建工作是由一个工厂类负责的: staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
	switch c.Type {
	case storagebackend.StorageTypeETCD2:
		return newETCD2Storage(c)
	case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
		// TODO: We have the following features to implement:
		// - Support secure connection by using key, cert, and CA files.
		// - Honor "https" scheme to support secure connection in gRPC.
		// - Support non-quorum read.
		return newETCD3Storage(c)
	default:
		return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
	}
}

好吧,看到这里就彻底明白了,根据配置中指定的存储服务创建不同的Storage对象,并且目前只支持ETCD2和ETCD3两种存储服务,持久层所做的增删改查就是对ETCD中数据的增删改查,总结一下Storage对象也是就Interface被创建的过程: 1. 根据配置中提供的存储服务名称选择进入不相应的创建函数,也就是上面的逻辑。 1. 从配置对象中取出我们启动kube-apiserver时指定的ETCD的证书证书、私钥和CA证书(如果开启了双向证),用于apiserver和ETCD通信,用这些信自生成一个配置对象。 1. 用配置对象创建一个ETCD的客户端。 1. 创建一个k8s.io/apiserver/pkg/storage/etcd3.store对象并让其持有这个ETCD客户端 1. 将这个对象作为Interface返回。

那么这个k8s.io/apiserver/pkg/storage/etcd3.store对象就成了关键啦,因为它现了Interface的所有接口函数,我们看看它的Create()函数据是怎样实现的: vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go

// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
		return errors.New("resourceVersion should not be set on objects to be created")
	}
	data, err := runtime.Encode(s.codec, obj)
	if err != nil {
		return err
	}
	key = path.Join(s.pathPrefix, key)

	opts, err := s.ttlOpts(ctx, int64(ttl))
	if err != nil {
		return err
	}

	newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}

	txnResp, err := s.client.KV.Txn(ctx).If(
		notFound(key),
	).Then(
		clientv3.OpPut(key, string(newData), opts...),
	).Commit()
	if err != nil {
		return err
	}
	if !txnResp.Succeeded {
		return storage.NewKeyExistsError(key, 0)
	}

	if out != nil {
		putResp := txnResp.Responses[0].GetResponsePut()
		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
	}
	return nil
}

其中txnResp, err := s.client.KV.Txn(ctx).If(这一行就是调用ETCD客户端了。

2.1 调度

调度工作是由kube-scheduler负责的,而且不受kube-apiserver控制,kube-scheduler通过kube-apiserver的REST API不断地检查是否有新的且还没有被调度的Pod,如果如有则根据配置中的调度算法为其绑定到某个节点,绑定的过程也是通过REST API将信息写入到kube-apiserver中的,对应的REST API定义如下: pkg/registry/core/rest/storage_core.go

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
		"pods/portforward": podStorage.PortForward,
		"pods/proxy":       podStorage.Proxy,
		"pods/binding":     podStorage.Binding,
		"bindings":         podStorage.Binding,
...

最后的两个API就是了。

下面是REST API/api/v1/namespaces/{my-ns}/pods对应的Store对象的定义: pkg/registry/core/pod/storage/storage.go

// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
...
	return PodStorage{
		Pod:         &REST{store, proxyTransport},
		Binding:     &BindingREST{store: store},
		Eviction:    newEvictionStorage(store, podDisruptionBudgetClient),
		Status:      &StatusREST{store: &statusStore},
		Log:         &podrest.LogREST{Store: store, KubeletConn: k},
		Proxy:       &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
		Exec:        &podrest.ExecREST{Store: store, KubeletConn: k},
		Attach:      &podrest.AttachREST{Store: store, KubeletConn: k},
		PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
	}

其中的Binding字段就是在创建绑定逻辑的Store了。

3.1 创建

最后的创建工作由各个节点上的kubelet组件负责,工作原理同kube-scheduler一样,通过REST API从kube-apiserver循环监听是否有新创建的并且已经被绑定到自己节点的Pod,如果有则在自己的节点上创建相应的Docker容器并设置容器的网络、端口转发、内存和CPU限制等,然后把结果用REST API通知给kube-apiserver


编写日期:2017-12-30