k8s源码分析-数据流程
本文从源码层面解释kubernetes从接收创建Pod的指令到实际创建Pod的整个过程。
1.1 接收请求
监听的任务是由kube-apiserver
这个组件来完成的,它实际上是一个WEB服务,一般是以双向TLS认证方式启动的,所以在启动时需要提供证书、私钥、客户端的CA证书和CA私钥,当然也支持HTTP的方式,启动后就开始监听用户请求了。
1.2 数据存储
kube-apiserver
中的WEB服务在启动时注册了很多的Handler,golang中的Handler相当于Java中的servlet或者是Spring中的Controller,是对某一业务逻辑的封装,通俗点说,一个Handler负责对一个URI请求的处理,而在kube-apiserver
中,把Handler封装成了一个叫Store的东西,怎么封装的呢?比如/api/v1/namespaces/{namespace}/pods
这个URI对应了一个叫PodStorage
的Store,这个Store中包含了对/api/v1/namespaces/{namespace}/pods
的多个Handler,这些Handler有的是处理创建请求,有的是处理删除请求等等,代表了对一种资源的操作集。
1.2.1 Store对象
我们来看看这个Store的定义:
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {
Copier runtime.ObjectCopier
// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
NewFunc func() runtime.Object
...
CreateStrategy rest.RESTCreateStrategy
AfterCreate ObjectFunc
UpdateStrategy rest.RESTUpdateStrategy
AfterUpdate ObjectFunc
DeleteStrategy rest.RESTDeleteStrategy
AfterDelete ObjectFunc
...
// Storage is the interface for the underlying storage for the resource.
Storage storage.Interface
// Called to cleanup clients used by the underlying Storage; optional.
DestroyFunc func() entries. default value.
WatchCacheSize *int
}
...
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
...
func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
...
因为每种资源所需要的操作不一样,所以Store中只包含了基本的通用的操作,作为一个基础类。
在Store的定义中有一个Storage storage.Interface
字段,Store中的创建、更新、删除等操作都是调用的这个对象中的方法,也就是说这个Storage对象包含了一组更低级的操作,可以看作是数据的持久化层,这些操作都是通用的,而Store可以用Storage中的功能组合出具有不同功能的控制层对象,也就是Store对象啊,,好吧我们距离真相又进了一步,那这个storage.Interface
又是怎样实现的呢?
1.2.2 Interface的实现
Storage
是一个名为Interface的接口,里面定义了一些数据持久层的操作,这里就不贴出来了,我们更关心它的实现,我们先来看看Interface实例的创建吧,它的创建工作是由一个工厂类负责的:
staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
好吧,看到这里就彻底明白了,根据配置中指定的存储服务创建不同的Storage对象,并且目前只支持ETCD2和ETCD3两种存储服务,持久层所做的增删改查就是对ETCD中数据的增删改查,总结一下Storage对象也是就Interface被创建的过程:
- 根据配置中提供的存储服务名称选择进入不相应的创建函数,也就是上面的逻辑。
- 从配置对象中取出我们启动
kube-apiserver
时指定的ETCD的证书证书、私钥和CA证书(如果开启了双向证),用于apiserver和ETCD通信,用这些信自生成一个配置对象。 - 用配置对象创建一个ETCD的客户端。
- 创建一个
k8s.io/apiserver/pkg/storage/etcd3.store
对象并让其持有这个ETCD客户端 - 将这个对象作为Interface返回。
那么这个k8s.io/apiserver/pkg/storage/etcd3.store
对象就成了关键啦,因为它现了Interface的所有接口函数,我们看看它的Create()
函数据是怎样实现的:
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
其中txnResp, err := s.client.KV.Txn(ctx).If(
这一行就是调用ETCD客户端了。
2.1 调度
调度工作是由kube-scheduler
负责的,而且不受kube-apiserver
控制,kube-scheduler
通过kube-apiserver
的REST API不断地检查是否有新的且还没有被调度的Pod,如果如有则根据配置中的调度算法为其绑定到某个节点,绑定的过程也是通过REST API将信息写入到kube-apiserver
中的,对应的REST API定义如下:
pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
...
最后的两个API就是了。
下面是REST API/api/v1/namespaces/{my-ns}/pods
对应的Store对象的定义:
pkg/registry/core/pod/storage/storage.go
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
...
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}
其中的Binding
字段就是在创建绑定逻辑的Store了。
3.1 创建
最后的创建工作由各个节点上的kubelet
组件负责,工作原理同kube-scheduler
一样,通过REST API从kube-apiserver
循环监听是否有新创建的并且已经被绑定到自己节点的Pod,如果有则在自己的节点上创建相应的Docker容器并设置容器的网络、端口转发、内存和CPU限制等,然后把结果用REST API通知给kube-apiserver
。