关注「开源Linux」,选择星标
回复「学习」,有我为您筛选的学习资料~
全文大纲:
K8s 组件启动过程
kubectl(命令行客户端)
kube-apiserver
写入 etcd
Initializers
Control loops(控制循环)
Kubelet
本文试图回答以下问题:,
我们需要找出这个问题:
了解 K8s 几个核心组件的启动过程,他们分别做了什么,
请求从客户端开始 pod ready 整个过程。
0 K8s 组件启动过程
首先,看看几个核心组件的启动过程。
0.1 kube-apiserver 启动
调用栈
创建命令行(kube-apiserver
)入口:
main//cmd/kube-apiserver/apiserver.go |-cmd:=app.NewAPIServerCommand()//cmd/kube-apiserver/app/server.go ||-RunE:=func(){ |Complete() ||-ApplyAuthorization(s.Authorization) ||-ifTLS: |ServiceAccounts.KeyFiles=[]string{CertKey.KeyFile} |Validate() |Run(completedOptions,handlers)//核心逻辑 |} |-cmd.Execute()
kube-apiserver
启动后,将执行到其中Run()
方法:
Run()//cmd/kube-apiserver/app/server.go |-server=CreateServerChain() ||-CreateKubeAPIServerConfig() |||-buildGenericConfig ||||-genericapiserver.NewConfig()//staging/src/k8s.io/apiserver/pkg/server/config.go |||||-return&Config{ ||||Serializer:codecs, ||||BuildHandlerChainFunc:DefaultBuildHandlerChain,//注册handler ||||} |||| ||||-OpenAPIConfig=DefaultOpenAPIConfig()//OpenAPIschema ||||-kubeapiserver.NewStorageFactoryConfig()//etcd相关配置 ||||-APIResourceConfig=genericConfig.MergedResourceConfig ||||-storageFactoryConfig.Complete(s.Etcd) ||||-storageFactory=completedStorageFactoryConfig.New() ||nbsp; | |-s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
| | | |-BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
| | | |-pluginInitializers, admissionPostStartHook = admissionConfig.New()
| | |
| | |-capabilities.Initialize
| | |-controlplane.ServiceIPRange()
| | |-config := &controlplane.Config{}
| | |-AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook)
| | |-ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
| | |-ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
| | |-ServiceAccountPublicKeys = pubKeys
| |
| |-createAPIExtensionsServer
| |-CreateKubeAPIServer
| |-createAggregatorServer // cmd/kube-apiserver/app/aggregator.go
| | |-aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
| | | |-apiGroupInfo := NewRESTStorage()
| | | |-GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
| | | |-InstallAPIGroups
| | | |-openAPIModels := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
| | | |-for apiGroupInfo := range apiGroupInfos {
| | | | s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
| | | | s.DiscoveryGroupManager.AddGroup(apiGroup)
| | | | s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
| | | |
| | | |-GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
| | | |-GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
| | | |-
| | |-
|-prepared = server.PrepareRun() // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
| |-GenericAPIServer.AddPostStartHookOrDie
| |-GenericAPIServer.PrepareRun
| | |-routes.OpenAPI{}.Install()
| | |-registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
| | |-POST: XX
| | |-GET: XX
| |
| |-openapiaggregator.BuildAndRegisterAggregator()
| |-openapiaggregator.NewAggregationController()
| |-preparedAPIAggregator{}
|-prepared.Run() // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
|-s.runnable.Run()
一些重要步骤
。Server aggregation(聚合)是一种支持多 apiserver 的方式,其中 包括了一个 generic apiserver[3],作为默认实现。
,保存到 apiserver 的 Config.OpenAPIConfig 字段[4]。
遍历 schema 中的所有 API group,为每个 API group 配置一个 storage provider[5], 这是一个通用 backend 存储抽象层。
遍历每个 group 版本,为每个 HTTP route 配置 REST mappings[6]。稍后处理请求时,就能将 requests 匹配到合适的 handler。
0.2 controller-manager 启动
调用栈
NewDeploymentController
NewReplicaSetController
0.3 kubelet 启动
调用栈
main // cmd/kubelet/kubelet.go
|-NewKubeletCommand // cmd/kubelet/app/server.go
|-Run // cmd/kubelet/app/server.go
|-initForOS // cmd/kubelet/app/server.go
|-run // cmd/kubelet/app/server.go
|-initConfigz // cmd/kubelet/app/server.go
|-InitCloudProvider
|-NewContainerManager
|-ApplyOOMScoreAdj
|-PreInitRuntimeService
|-RunKubelet // cmd/kubelet/app/server.go
| |-k = createAndInitKubelet // cmd/kubelet/app/server.go
| | |-NewMainKubelet
| | | |-watch k8s Service
| | | |-watch k8s Node
| | | |-klet := &Kubelet{}
| | | |-init klet fields
| | |
| | |-k.BirthCry()
| | |-k.StartGarbageCollection()
| |
| |-startKubelet(k) // cmd/kubelet/app/server.go
| |-go k.Run() // -> pkg/kubelet/kubelet.go
| | |-go cloudResourceSyncManager.Run()
| | |-initializeModules
| | |-go volumeManager.Run()
| | |-go nodeLeaseController.Run()
| | |-initNetworkUtil() // setup iptables
| | |-go Until(PerformPodKillingWork, 1*time.Second, neverStop)
| | |-statusManager.Start()
| | |-runtimeClassManager.Start
| | |-pleg.Start()
| | |-syncLoop(updates, kl) // pkg/kubelet/kubelet.go
| |
| |-k.ListenAndServe
|
|-go http.ListenAndServe(healthz)
0.4 小结
以上核心组件启动完成后,就可以从命令行发起请求创建 pod 了。
1 kubectl(命令行客户端)
1.0 调用栈概览
NewKubectlCommand // staging/src/k8s.io/kubectl/pkg/cmd/cmd.go
|-matchVersionConfig = NewMatchVersionFlags()
|-f = cmdutil.NewFactory(matchVersionConfig)
| |-clientGetter = matchVersionConfig
|-NewCmdRun(f) // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| |-Complete // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| |-Run(f) // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| |-validate parameters
| |-generators = GeneratorFn("run")
| |-runObj = createGeneratedObject(generators) // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| | |-obj = generator.Generate() // -> staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go
| | | |-get pod params
| | | |-pod = v1.Pod{params}
| | | |-return &pod
| | |-mapper = f.ToRESTMapper() // -> staging/src/k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go
| | | |-f.clientGetter.ToRESTMapper() // -> staging/src/k8s.io/kubectl/pkg/cmd/util/factory_client_access.go
| | | |-f.Delegate.ToRESTMapper() // -> staging/src/k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go
| | | |-ToRESTMapper // -> staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
| | | |-delegate() // staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
| | |--actualObj = resource.NewHelper(mapping).XX.Create(obj)
| |-PrintObj(runObj.Object)
|
|-NewCmdEdit(f) // kubectl edit 命令
|-NewCmdScale(f) // kubectl scale 命令
|-NewCmdCordon(f) // kubectl cordon 命令
|-NewCmdUncordon(f)
|-NewCmdDrain(f)
|-NewCmdTaint(f)
|-NewCmdExecute(f)
|-...
1.1 参数验证(validation)和资源对象生成器(generator)
参数验证
敲下 kubectl
命令后,它首先会做一些客户端侧的验证。如果命令行参数有问题,例如,镜像名为空或格式不对[7], 这里会直接报错,从而避免了将明显错误的请求发给 kube-apiserver,减轻了后者的压力。
此外,kubectl 还会检查其他一些配置,例如
是否需要记录(record)这条命令(用于 rollout 或审计)
是否只是测试执行(
--dry-run
)
创建 HTTP 请求
所有都需要与 kube-apiserver 交互,后者会进一步和 etcd 通信。
因此,验证通过之后,kubectl 接下来会创建发送给 kube-apiserver 的 HTTP 请求。
Generators
generator[8](文档[9]) ,它封装了资源的序列化(serialization)操作。例如,创建 pod 时用到的 generator 是 BasicPod[10]:
// staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go
type BasicPod struct{}
func (BasicPod) ParamNames() []generate.GeneratorParam {
return []generate.GeneratorParam{
{Name: "labels", Required: false},
{Name: "name", Required: true},
{Name: "image", Required: true},
...
}
}
每个 generator 都实现了一个 Generate()
方法,用于生成一个该资源的运行时对象(runtime object)。对于 BasicPod
,其实现[11]为:
func (BasicPod) Generate(genericParams map[string]interface{}) (runtime.Object, error) {
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{ // metadata 字段
Name: name,
Labels: labels,
...
},
Spec: v1.PodSpec{ // spec 字段
ServiceAccountName: params["serviceaccount"],
Containers: []v1.Container{
{
Name: name,
Image: params["image"]
},
},
},
}
return &pod, nil
}
1.2 API group 和版本协商(version negotiation)
有了 runtime object 之后,kubectl 需要用合适的 API 将请求发送给 kube-apiserver。
API Group
K8s 用 API group 来管理 resource API。这是一种不同于 monolithic API(所有 API 扁平化)的 API 管理方式。
具体来说,。例如 Deployment 资源的 API group 名为 apps
,最新的版本是 v1
。这也是为什么 我们在创建 Deployment 时,需要在 yaml 中指定 apiVersion: apps/v1
的原因。
版本协商
生成 runtime object 之后,kubectl 就开始搜索合适的 API group 和版本[12]:
// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
obj := generator.Generate(params) // 创建运行时对象
mapper := f.ToRESTMapper() // 寻找适合这个资源(对象)的 API group
然后创建一个正确版本的客户端(versioned client)[13],
// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
gvks, _ := scheme.Scheme.ObjectKinds(obj)
mapping := mapper.RESTMapping(gvks[0].GroupKind(), gvks[0].Version)
这个客户端能感知资源的 REST 语义。
以上过程称为版本协商。在实现上,kubectl 会(OpenAPI 格式的 schema 文档),获取所有的 API groups。
出于性能考虑,kubectl 会缓存这份 OpenAPI schema[14], 路径是 ~/.kube/cache/discovery
。, 然后随便执行一条 kubectl 命令,并指定足够大的日志级别(例如 kubectl get ds -v 10
)。
发送 HTTP 请求
现在有了 runtime object,也找到了正确的 API,因此接下来就是 将请求真正发送出去[15]:
// staging/src/k8s.io/kubectl/pkg/cmd/cmd.go
actualObj = resource.
NewHelper(client, mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.fieldManager).
Create(o.Namespace, false, obj)
发送成功后,会以恰当的格式打印返回的消息。
1.3 客户端认证(client auth)
前面其实有意漏掉了一步:客户端认证。它发生在发送 HTTP 请求之前。
, 优先级从高到低:
命令行
--kubeconfig <file>
环境变量
$KUBECONFIG
某些预定义的路径[16],例如
~/.kube
。
,如下面所示:
apiVersion: v1
clusters:
- cluster:
certificate-authority: /etc/kubernetes/pki/ca.crt
server: https://192.168.2.100:443
name: k8s-cluster-1
contexts:
- context:
cluster: k8s-cluster-1
user: default-user
name: default-context
current-context: default-context
kind: Config
preferences: {}
users:
- name: default-user
user:
client-certificate: /etc/kubernetes/pki/admin.crt
client-key: /etc/kubernetes/pki/admin.key
有了这些信息之后,客户端就可以组装 HTTP 请求的认证头了。支持的认证方式有几种:
:放到 TLS[17] 中发送;
:放到 HTTP
"Authorization"
头中发送[18];:放到 HTTP basic auth 发送[19];
:需要先由用户手动处理,将其转成一个 token,然后和 bearer token 类似发送。
2 kube-apiserver
请求从客户端发出后,便来到服务端,也就是 kube-apiserver。
2.0 调用栈概览
buildGenericConfig
|-genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) // cmd/kube-apiserver/app/server.go
NewConfig // staging/src/k8s.io/apiserver/pkg/server/config.go
|-return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
} /
/
/
/
DefaultBuildHandlerChain // staging/src/k8s.io/apiserver/pkg/server/config.go
|-handler := filterlatency.TrackCompleted(apiHandler)
|-handler = genericapifilters.WithAuthorization(handler)
|-handler = genericapifilters.WithAudit(handler)
|-handler = genericapifilters.WithAuthentication(handler)
|-return handler
WithAuthentication
|-withAuthentication
|-resp, ok := AuthenticateRequest(req)
| |-for h := range authHandler.Handlers {
| resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
| if ok {
| return resp, ok, err
| }
| }
| return nil, false, utilerrors.NewAggregate(errlist)
|
|-audiencesAreAcceptable(apiAuds, resp.Audiences)
|-req.Header.Del("Authorization")
|-req = req.WithContext(WithUser(req.Context(), resp.User))
|-return handler.ServeHTTP(w, req)
2.1 认证(Authentication)
kube-apiserver 首先会对请求进行认证(authentication),以确保用户身份是合法的(verify that the requester is who they say they are)。
具体过程:启动时,检查所有的命令行参数[20],组织成一个 authenticator list,例如,
如果指定了
--client-ca-file
,就会将 x509 证书加到这个列表;如果指定了
--token-auth-file
,就会将 token 加到这个列表;
不同 anthenticator 做的事情有所不同:
x509 handler[21] 验证该 HTTP 请求是用 TLS key 加密的,并且有 CA root 证书的签名。
bearer token handler[22] 验证请求中带的 token(HTTP Authorization 头中),在 apiserver 的 auth file 中是存在的(
--token-auth-file
)。basicauth handler[23] 对 basic auth 信息进行校验。
,然后在上下文中加上用户信息[24]。这使得后面的步骤(例如鉴权和 admission control)能用到这里已经识别出的用户身份信息。
// staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go
// WithAuthentication creates an http handler that tries to authenticate the given request as a user, and then
// stores any such user found onto the provided context for the request.
// On success, "Authorization" header is removed from the request and handler
// is invoked to serve the request.
func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
apiAuds authenticator.Audiences) http.Handler {
return withAuthentication(handler, auth, failed, apiAuds, recordAuthMetrics)
}
func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
resp, ok := auth.AuthenticateRequest(req) // 遍历所有 authenticator,任何一个成功就返回 OK
if !ok {
return failed.ServeHTTP(w, req) // 所有认证方式都失败了
}
if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
failed.ServeHTTP(w, req)
return
}
req.Header.Del("Authorization") // 认证成功后,这个 header 就没有用了,可以删掉
// 将用户信息添加到请求上下文中,供后面的步骤使用
req = req.WithContext(WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req)
})
}
AuthenticateRequest()
实现:遍历所有 authenticator,任何一个成功就返回 OK,
// staging/src/k8s.io/apiserver/pkg/authentication/request/union/union.go
func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req) (*Response, bool) {
for currAuthRequestHandler := range authHandler.Handlers {
resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
if ok {
return resp, ok, err
}
}
return nil, false, utilerrors.NewAggregate(errlist)
}
2.2 鉴权(Authorization)
。因此确认发送者身份之后,还需要进行鉴权。
鉴权的过程与认证非常相似,也是逐个匹配 authorizer 列表中的 authorizer:如果都失败了, 返回 Forbidden
并停止 进一步处理[25]。如果成功,就继续。
内置的 :
webhook[26]:与其他服务交互,验证是否有权限。
ABAC[27]:根据静态文件中规定的策略(policies)来进行鉴权。
RBAC[28]:根据 role 进行鉴权,其中 role 是 k8s 管理员提前配置的。
Node[29]:确保 node clients,例如 kubelet,只能访问本机内的资源。
要看它们的具体做了哪些事情,可以查看它们各自的 Authorize()
方法。
2.3 Admission control
至此,认证和鉴权都通过了。但这还没结束,K8s 中的其它组件还需要对请求进行检查, 其中就包括 admission controllers[30]。
与鉴权的区别
鉴权(authorization)在前面,关注的是,
Admission controllers 在更后面,, 是。
工作方式
与认证和鉴权类似,也是遍历一个列表,
但有一点核心区别:。
设计:可扩展
每个 controller 作为一个 plugin 存放在 plugin/pkg/admission 目录[31],
设计时已经考虑,只需要实现很少的几个接口
但注意,(而非独立的 plugin binary)
类型
Admission controllers 通常按不同目的分类,包括:(referential consistency)等类型。
例如,下面是资源管理类的几个 controller:
InitialResources
:为容器设置默认的资源限制(基于过去的使用量);LimitRanger
:为容器的 requests and limits 设置默认值,或对特定资源设置上限(例如,内存默认 512MB,最高不超过 2GB)。ResourceQuota
:资源配额。
3 写入 etcd
至此,K8s 已经完成对请求的验证,允许它进行接下来的处理。
kube-apiserver 将对请求进行反序列化,构造 runtime objects持久化到 etcd。下面详细 看这个过程。
3.0 调用栈概览
对于本文创建 pod 的请求,相应的入口是POST handler[32],它又会进一步将请求委托给一个创建具体资源的 handler。
registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
|-case POST:
// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
switch () {
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, .., handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
route := ws.POST(action.Path).To(handler).
Doc(doc).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
AddObjectParams(ws, route, versionedCreateOptions)
addParams(route, action.Params)
routes = append(routes, route)
}
for route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
ws.Route(route)
}
3.1 kube-apiserver 请求处理过程
从 apiserver 的请求处理函数开始:
// staging/src/k8s.io/apiserver/pkg/server/handler.go
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
if path == "/apis" || path == "/apis/" {
return d.goRestfulContainer.Dispatch(w, req)
}
case strings.HasPrefix(path, ws.RootPath()):
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
return d.goRestfulContainer.Dispatch(w, req)
}
}
}
// if we didn't find a match, then we just skip gorestful altogether
d.nonGoRestfulMux.ServeHTTP(w, req)
}
如果能匹配到请求(例如匹配到前面注册的路由),它将分派给相应的 handler[33];否则,fall back 到path-based handler[34](GET /apis
到达的就是这里);
基于 path 的 handlers:
// staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
return exactHandler.ServeHTTP(w, r)
}
for prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
return prefixHandler.handler.ServeHTTP(w, r)
}
}
h.notFoundHandler.ServeHTTP(w, r)
}
如果还是没有找到路由,就会 fallback 到 non-gorestful handler,最终可能是一个 not found handler。
对于我们的场景,会匹配到一条已经注册的、名为`createHandler`[35]为的路由。
3.2 Create handler 处理过程
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
func createHandler(r rest.NamedCreater, scope *RequestScope, admit Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
namespace, name := scope.Namer.Name(req) // 获取资源的 namespace 和 name(etcd item key)
s := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
body := limitedReadBody(req, scope.MaxRequestBodyBytes)
obj, gvk := decoder.Decode(body, &defaultGVK, original)
admit = admission.WithAudit(admit, ae)
requestFunc := func() (runtime.Object, error) {
return r.Create(
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
)
}
result := finishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj := scope.Creater.New(scope.Kind)
obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
}
admit.(admission.MutationInterface)
mutatingAdmission.Handles(admission.Create)
mutatingAdmission.Admit(ctx, admissionAttributes, scope)
return requestFunc()
})
code := http.StatusCreated
status, ok := result.(*metav1.Status)
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}
}
首先解析 HTTP request,然后执行基本的验证,例如保证 JSON 与 versioned API resource 期望的是一致的;
执行审计和最终 admission;
将资源最终写到 etcd[36], 这会进一步调用到 storage provider[37]。
<namespace>/<name>
(例如,default/nginx-0
),但这个也是可配置的。最后,storage provider 执行一次
get
操作,确保对象真的创建成功了。如果有额外的收尾任务(additional finalization),会执行 post-create handlers 和 decorators。返回 生成的[38]HTTP response。
以上过程可以看出,apiserver 做了大量的事情。
总结:至此我们的 pod 资源已经在 etcd 中了。但是,此时 kubectl get pods -n <ns>
还看不见它。
4 Initializers
, 而是要先等一些 initializers[39] 运行完成。
4.1 Initializer
Initializer 是与特定资源类型(resource type)相关的 controller,
负责,
如果一种资源类型没有注册任何 initializer,这个步骤就会跳过,。
这是一种非常强大的特性,使得我们能。例如,
向 Pod 注入 sidecar、暴露 80 端口,或打上特定的 annotation。
向某个 namespace 内的所有 pod 注入一个存放了测试证书(test certificates)的 volume。
禁止创建长度小于 20 个字符的 Secret (例如密码)。
4.2 InitializerConfiguration
可以用 InitializerConfiguration
。
例如,要实现所有 pod 创建时都运行一个自定义的 initializer custom-pod-initializer
, 可以用下面的 yaml:
apiVersion: admissionregistration.k8s.io/v1alpha1
kind: InitializerConfiguration
metadata:
name: custom-pod-initializer
initializers:
- name: podimage.example.com
rules:
- apiGroups:
- ""
apiVersions:
- v1
resources:
- pods
创建以上配置(kubectl create -f xx.yaml
)之后,K8s 会将custom-pod-initializer
。
在此之前需要启动 initializer controller,它会
定期扫描是否有新 pod 创建;
当时,就会执行它的处理逻辑;
执行完成之后,它会将自己的名字从 pending list 中移除。
pending list 中的 initializers,每次只有第一个 initializer 能执行。当之后,就认为(considered initialized)。
细心的同学可能会有疑问:答案是:kube-apiserver 提供了一个 ?includeUninitialized
查询参数,它会返回所有对象, 包括那些还未完成初始化的(uninitialized ones)。
5 Control loops(控制循环)
至此,对象已经在 etcd 中了,所有的初始化步骤也已经完成了。下一步是设置资源拓扑(resource topology)。例如,一个 Deployment 其实就是一组 ReplicaSet,而一个 ReplicaSet 就是一组 Pod。K8s 是如何根据一个 HTTP 请求创建出这个层级关系的呢?靠的是 (controllers)。
K8s 中大量使用 "controllers",
一个 controller 就是一个异步脚本(an asynchronous script),
不断检查资源的(current state)和(desired state)是否一致,
如果不一致就尝试将其变成期望状态,这个过程称为 。
每个 controller 负责的东西都比较少,。
5.1 Deployments controller
Deployments controller 启动
当一个 Deployment record 存储到 etcd 并(被 initializers)初始化之后, kube-apiserver 就会将其置为对外可见的。此后, Deployment controller 监听了 Deployment 资源的变动,因此此时就会检测到这个新创建的资源。
// pkg/controller/deployment/deployment_controller.go
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer DeploymentInformer, rsInformer ReplicaSetInformer,
podInformer PodInformer, client clientset.Interface) (*DeploymentController, error) {
dc := &DeploymentController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(),
}
dc.rsControl = controller.RealRSControl{ // ReplicaSet controller
KubeClient: client,
Recorder: dc.eventRecorder,
}
// 注册 Deployment 事件回调函数
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment, // 有 Deployment 创建时触发
UpdateFunc: dc.updateDeployment,
DeleteFunc: dc.deleteDeployment,
})
// 注册 ReplicaSet 事件回调函数
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
&