廊坊教育云网站建设,网站建设外包项目,西安 网站建设 培训学校,wordpress用的php代码编辑器1. 概述
KubeAPIServer 主要是提供对 API Resource 的操作请求#xff0c;为 kubernetes 中众多 API 注册路由信息#xff0c;暴露 RESTful API 并且对外提供 kubernetes service#xff0c;使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。
2…1. 概述
KubeAPIServer 主要是提供对 API Resource 的操作请求为 kubernetes 中众多 API 注册路由信息暴露 RESTful API 并且对外提供 kubernetes service使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。
2. 代码流图及 3. 源码分析
3.1 启动main函数
func main() {command : app.NewAPIServerCommand()code : cli.Run(command)os.Exit(code)
}3.2 创建cobra.Command对象
// NewAPIServerCommand creates a *cobra.Command object with default parameters
// NewAPIServerCommand使用默认参数创建cobra.Command对象
func NewAPIServerCommand() *cobra.Command {s : options.NewServerRunOptions()cmd : cobra.Command{Use: kube-apiserver,Long: The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
clusters shared state through which all other components interact.,// stop printing usage when the command errorsSilenceUsage: true,PersistentPreRunE: func(*cobra.Command, []string) error {// silence client-go warnings.// kube-apiserver loopback clients should not log self-issued warnings.rest.SetDefaultWarningHandler(rest.NoWarnings{})return nil},RunE: func(cmd *cobra.Command, args []string) error {verflag.PrintAndExitIfRequested()fs : cmd.Flags()// Activate logging as soon as possible, after that// show flags with the final logging configuration.if err : logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err ! nil {return err}cliflag.PrintFlags(fs)// set default optionscompletedOptions, err : Complete(s)if err ! nil {return err}// validate optionsif errs : completedOptions.Validate(); len(errs) ! 0 {return utilerrors.NewAggregate(errs)}// add feature enablement metricsutilfeature.DefaultMutableFeatureGate.AddMetrics()return Run(completedOptions, genericapiserver.SetupSignalHandler())},Args: func(cmd *cobra.Command, args []string) error {for _, arg : range args {if len(arg) 0 {return fmt.Errorf(%q does not take any arguments, got %q, cmd.CommandPath(), args)}}return nil},}// 设置flag标识fs : cmd.Flags()namedFlagSets : s.Flags()verflag.AddFlags(namedFlagSets.FlagSet(global))globalflag.AddGlobalFlags(namedFlagSets.FlagSet(global), cmd.Name(), logs.SkipLoggingConfigurationFlags())options.AddCustomGlobalFlags(namedFlagSets.FlagSet(generic))for _, f : range namedFlagSets.FlagSets {fs.AddFlagSet(f)}cols, _, _ : term.TerminalSize(cmd.OutOrStdout())cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)return cmd
}3.3 APIServer启动流程主要逻辑为
调用CreateServerChain构建服务调用链并判断是否启动非安全的httpserverhttpserver链中包含 apiserver要启动的三个server以及为每个server注册对应资源的路由调用server.PrepareRun进行服务运行前的准备该方法主要完成了健康检查. 存活检查和OpenAPI路由的注册工作调用prepared.Run启动server
// Run 运行指定的 APIServer,不会退出
func Run(completeOptions completedServerRunOptions, stopCh -chan struct{}) error {// 注意此版本不是k8s的versionklog.Infof(Version: %v, version.Get())klog.InfoS(Golang settings, GOGC, os.Getenv(GOGC), GOMAXPROCS, os.Getenv(GOMAXPROCS), GOTRACEBACK, os.Getenv(GOTRACEBACK))// 创建调用链通过delegation代理创建server其实是代理聚合后的serverserver, err : CreateServerChain(completeOptions)if err ! nil {return err}// 进行一些准备工作 注册一些hander执行hook等包括通过设置 OpenAPI 规范并调用通用apiserver PrepareRun 来准备运行聚合服务。prepared, err : server.PrepareRun()if err ! nil {return err}// 开始启动聚合服务return prepared.Run(stopCh)
}3.3.1 服务调用链分析
初始化阶段, 通过CreateServerChain创建调用链
创建过程主要有以下步骤
根据配置构造apiserver的配置调用方法CreateKubeAPIServerConfig根据配置构造扩展的apiserver的配置调用方法为createAPIExtensionsConfig创建server包括扩展的apiserver和原生的apiserver调用方法为createAPIExtensionsServer和CreateKubeAPIServer。主要就是将各个handler的路由方法注册到Container中去完全遵循go-restful的设计模式即将处理方法注册到Route中去同一个根路径下的Route注册到WebService中去WebService注册到Container中Container负责分发。访问的过程为Container--WebService--Route聚合server的配置和和创建。主要就是将原生的apiserver和扩展的apiserver的访问进行整合添加后续的一些处理接口。调用方法为createAggregatorConfig和createAggregatorServer创建完成返回配置的server信息
Aggregator 和 APIExtensionsServer 对应两种主要扩展 APIServer 资源的方式即分别是 AA 和 CRD。
// CreateServerChain创建通过委托连接的apiserver。
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {// 1. 为 kubeAPIServer 创建配置kubeAPIServerConfig, serviceResolver, pluginInitializer, err : CreateKubeAPIServerConfig(completedOptions)if err ! nil {return nil, err}// 2. 判断是否配置了 APIExtensionsServer创建 apiExtensionsConfig apiExtensionsConfig, err : createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))if err ! nil {return nil, err}// 返回一个 HTTP 处理程序该处理程序旨在在委托链的末尾执行。它检查是否在服务器安装所有已知的 HTTP 路径之前发出了请求。在这种情况下它返回 503 响应否则返回 404notFoundHandler : notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)//3. 创建 apiExtensionsServer 实例 完成apiextensionsConfig的完全配置 以及创建一个包含通用apiserver暴露group为apiextensions.k8s.io的api支持crd等操作的扩展服务apiExtensionsServer, err : createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))if err ! nil {return nil, err}// 4. 初始化 KubeAPIServer实例kubeAPIServer, err : CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)if err ! nil {return nil, err}// 5. 创建 AggregatorConfigaggregatorConfig, err : createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)if err ! nil {return nil, err}// 6. 初始化 AggregatorServer实例aggregatorServer, err : createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)if err ! nil {// we dont need special handling for innerStopCh because the aggregator server doesnt create any go routinesreturn nil, err}return aggregatorServer, nil
}
3.3.1.1 apiserver配置的创建
CreateKubeAPIServerConfig-buildGenericConfig-genericapiserver.NewConfig
// 创建运行 API 服务器的所有资源但不运行任何资源
func CreateKubeAPIServerConfig(s completedServerRunOptions) (*controlplane.Config,aggregatorapiserver.ServiceResolver,[]admission.PluginInitializer,error,
) {// 创建拨号器基础结构隧道和传输层以连接到节点。proxyTransport : CreateProxyTransport()// BuildGenericConfig 采用ServerRunOptions并生成与之关联的 genericapiserver.Config(kube-apiserver的通用配置)genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err : buildGenericConfig(s.ServerRunOptions, proxyTransport)if err ! nil {return nil, nil, nil, err}// s.AllowPrivileged 是否允许特权// 限制每个连接的吞吐量目前只用于proxy、exec、attach方式capabilities.Setup(s.AllowPrivileged, s.MaxConnectionBytesPerSec)// 应用指标选项 -- 执行些预置操作比如启用 禁用指标s.Metrics.Apply()// 注册指标收集器serviceaccount.RegisterMetrics()// 构造控制平面controlplane的配置config : controlplane.Config{GenericConfig: genericConfig,ExtraConfig: controlplane.ExtraConfig{APIResourceConfigSource: storageFactory.APIResourceConfigSource,StorageFactory: storageFactory,EventTTL: s.EventTTL,KubeletClientConfig: s.KubeletConfig,EnableLogsSupport: s.EnableLogsHandler,ProxyTransport: proxyTransport,ServiceIPRange: s.PrimaryServiceClusterIPRange,APIServerServiceIP: s.APIServerServiceIP,SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,APIServerServicePort: 443,ServiceNodePortRange: s.ServiceNodePortRange,KubernetesServiceNodePort: s.KubernetesServiceNodePort,EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),MasterCount: s.MasterCount,ServiceAccountIssuer: s.ServiceAccountIssuer,ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,VersionedInformers: versionedInformers,},}// 获取 用来获取并验证证书内容的提供器clientCAProvider, err : s.Authentication.ClientCert.GetClientCAContentProvider()if err ! nil {return nil, nil, nil, err}// 设置controlplane配置的验证证书内容的提供器config.ExtraConfig.ClusterAuthenticationInfo.ClientCA clientCAProvider// 用来设置controlplane配置的请求头信息包括认证证书等requestHeaderConfig, err : s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()if err ! nil {return nil, nil, nil, err}if requestHeaderConfig ! nil {config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA requestHeaderConfig.CAContentProviderconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames requestHeaderConfig.AllowedClientNamesconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes requestHeaderConfig.ExtraHeaderPrefixesconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders requestHeaderConfig.GroupHeadersconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders requestHeaderConfig.UsernameHeaders}// 添加PostStartHook钩子函数if err : config.GenericConfig.AddPostStartHook(start-kube-apiserver-admission-initializer, admissionPostStartHook); err ! nil {return nil, nil, nil, err}// 分发流量 -- controlplane cluster etcd等if config.GenericConfig.EgressSelector ! nil {// 使用config.GenericConfig.EgressSelector loop查找以找到连接到 kubelet 的拨号器config.ExtraConfig.KubeletClientConfig.Lookup config.GenericConfig.EgressSelector.Lookup// 使用 config.GenericConfig.EgressSelector 查找作为“代理”子资源使用的传输networkContext : egressselector.Cluster.AsNetworkContext()dialer, err : config.GenericConfig.EgressSelector.Lookup(networkContext)if err ! nil {return nil, nil, nil, err}c : proxyTransport.Clone()// 这里会替换拨号器c.DialContext dialerconfig.ExtraConfig.ProxyTransport c}// 加载公钥var pubKeys []interface{}for _, f : range s.Authentication.ServiceAccounts.KeyFiles {// 遍历serviceaccount的认证公私钥文件keys, err : keyutil.PublicKeysFromFile(f)if err ! nil {return nil, nil, nil, fmt.Errorf(failed to parse key file %q: %v, f, err)}// 追加到公钥数组中pubKeys append(pubKeys, keys...)}// 设置serviceaccount的证书标识及公钥config.ExtraConfig.ServiceAccountIssuerURL s.Authentication.ServiceAccounts.Issuers[0]config.ExtraConfig.ServiceAccountJWKSURI s.Authentication.ServiceAccounts.JWKSURIconfig.ExtraConfig.ServiceAccountPublicKeys pubKeysreturn config, serviceResolver, pluginInitializers, nil
}// 采用ServerRunOptions并生成与之关联的 genericapiserver.Config(kube-apiserver的通用配置)
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport,
) (genericConfig *genericapiserver.Config,versionedInformers clientgoinformers.SharedInformerFactory,serviceResolver aggregatorapiserver.ServiceResolver,pluginInitializers []admission.PluginInitializer,admissionPostStartHook genericapiserver.PostStartHookFunc,storageFactory *serverstorage.DefaultStorageFactory,lastErr error,
) {// 使用apiserver 生成apiserver genericConfiggenericConfig genericapiserver.NewConfig(legacyscheme.Codecs)// 获取默认所有的gvenable DisablegenericConfig.MergedResourceConfig controlplane.DefaultAPIResourceConfigSource()// 把s.GenericServerRunOptions相关配置应用到genericConfigif lastErr s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr ! nil {return}// 把s.SecureServing相关配置应用到genericConfig.SecureServing 并配置LoopbackClientConfig(具体看实现是否覆盖参数genericConfig.LoopbackClientConfig)if lastErr s.SecureServing.ApplyTo(genericConfig.SecureServing, genericConfig.LoopbackClientConfig); lastErr ! nil {return}// 配置是否开启debug pprof和 争用debug pprof功能if lastErr s.Features.ApplyTo(genericConfig); lastErr ! nil {return}// 将给定的 defaultAPIResourceConfig 与给定的 resourceConfigOverrides 合并。(合并原则是以defaultAPIResourceConfig为基础以resourceConfigOverrides中的设置为目标)if lastErr s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr ! nil {return}// 将出口选择器设置EgressSelectorOptions中的设置项添加到服务器配置中if lastErr s.EgressSelector.ApplyTo(genericConfig); lastErr ! nil {return}// 如果APIServerTracing对应在FeatureGate中设置为true开启if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {// 使用设置的追踪选项TracingOptions配置apiserver的跟踪配置if lastErr s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr ! nil {return}}// 包装定义以恢复禁用功能的任何更改 用来生成k8s的open api(类似于swagger)getOpenAPIDefinitions : openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)// 使用DefaultOpenAPIConfig设置为OpenAPIConfig的默认值genericConfig.OpenAPIConfig genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))// 设置OpenAPIConfig的标题为KubernetesgenericConfig.OpenAPIConfig.Info.Title Kubernetes// 如果默认开启了OpenAPIV3功能if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {genericConfig.OpenAPIV3Config genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))genericConfig.OpenAPIV3Config.Info.Title Kubernetes}// 判断是否是长运行请求就是保持长期会话比如watch动作就需要持续监听的方法genericConfig.LongRunningFunc filters.BasicLongRunningRequestCheck(sets.NewString(watch, proxy),sets.NewString(attach, exec, proxy, log, portforward),)// 版本信息这里主要是git和go的版本信息kubeVersion : version.Get()genericConfig.Version kubeVersionif genericConfig.EgressSelector ! nil {// 配置EgressLookup -- 包含controlplane etcd cluster模式s.Etcd.StorageConfig.Transport.EgressLookup genericConfig.EgressSelector.Lookup}// 配置链路追踪if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {s.Etcd.StorageConfig.Transport.TracerProvider genericConfig.TracerProvider} else {s.Etcd.StorageConfig.Transport.TracerProvider oteltrace.NewNoopTracerProvider()}if lastErr s.Etcd.Complete(genericConfig.StorageObjectCountTracker, genericConfig.DrainedNotify(), genericConfig.AddPostStartHook); lastErr ! nil {return}// 向c中添加etcd的健康检查并修改c中用于获取gr对应的RESTOptions的RESTOptionsGetter覆盖s中StorageConfig.StorageObjectCountTracker属性storageFactoryConfig : kubeapiserver.NewStorageFactoryConfig()storageFactoryConfig.APIResourceConfig genericConfig.MergedResourceConfigstorageFactory, lastErr storageFactoryConfig.Complete(s.Etcd).New()if lastErr ! nil {return}if lastErr s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr ! nil {return}// Use protobufs for self-communication.// Since not every generic apiserver has to support protobufs, we// cannot default to it in generic apiserver and need to explicitly// set it in kube-apiserver.genericConfig.LoopbackClientConfig.ContentConfig.ContentType application/vnd.kubernetes.protobuf// Disable compression for self-communication, since we are going to be// on a fast local networkgenericConfig.LoopbackClientConfig.DisableCompression truekubeClientConfig : genericConfig.LoopbackClientConfigclientgoExternalClient, err : clientgoclientset.NewForConfig(kubeClientConfig)if err ! nil {lastErr fmt.Errorf(failed to create real external clientset: %v, err)return}versionedInformers clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if presentif lastErr s.Authentication.ApplyTo(genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr ! nil {return}// 构建授权器和授权规则解析器genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)if err ! nil {lastErr fmt.Errorf(invalid authorization config: %v, err)return}// 如果授权mode中没有RABC模式if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {// 将rbac/bootstrap-roles插入到在不可用的PostStartHook集合中genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)}// 应用Audit日志审计功能到genericConfiglastErr s.Audit.ApplyTo(genericConfig)if lastErr ! nil {return}// 初始化准入插件所需的配置admissionConfig : kubeapiserveradmission.Config{ExternalInformers: versionedInformers,LoopbackClientConfig: genericConfig.LoopbackClientConfig,CloudConfigFile: s.CloudProvider.CloudConfigFile,}// 根据enabledAggregatorRouting构建service解析器serviceResolver buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)// 获取准入所需的插件和开始挂钩pluginInitializers, admissionPostStartHook, err admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)if err ! nil {lastErr fmt.Errorf(failed to create admission plugin initializer: %v, err)return}// 将准入链选项添加到服务器配置中err s.Admission.ApplyTo(genericConfig,versionedInformers,kubeClientConfig,utilfeature.DefaultFeatureGate,pluginInitializers...)if err ! nil {lastErr fmt.Errorf(failed to initialize admission: %v, err)return}// 如果APIPriorityAndFairness功能开启且 s.GenericServerRunOptions.EnablePriorityAndFairness true默认为trueif utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) s.GenericServerRunOptions.EnablePriorityAndFairness {genericConfig.FlowControl, lastErr BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)}// 构建 API 优先级和公平性过滤器的核心if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {genericConfig.AggregatedDiscoveryGroupManager aggregated.NewResourceManager()}return
}
3.3.1.2 构建apiextensionsapiserver.Config扩展配置 – 其实就是包装了通用apiserver配置和其他额外的配置
// 构建apiextensionsapiserver.Config扩展配置-- 其实就是包装了通用apiserver配置和其他额外的配置
func createAPIExtensionsConfig(kubeAPIServerConfig genericapiserver.Config,externalInformers kubeexternalinformers.SharedInformerFactory,pluginInitializers []admission.PluginInitializer,commandOptions *options.ServerRunOptions,masterCount int,serviceResolver webhook.ServiceResolver,authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
) (*apiextensionsapiserver.Config, error) {// 做一个浅拷贝让我们更改一些配置大多数配置实际上保持不变。我们只需要处理一些与 apiextensions 的细节相关的配置genericConfig : kubeAPIServerConfiggenericConfig.PostStartHooks map[string]genericapiserver.PostStartHookConfigEntry{}genericConfig.RESTOptionsGetter nil// 使用备份不能直接修改原始数据的Etcd选项来做一些修改etcdOptions : *commandOptions.Etcd// 获取是否支持分页etcdOptions.StorageConfig.Paging utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)// 构建编解码器 -- 解码为v1beta或者v1编码为internaletcdOptions.StorageConfig.Codec apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)// prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be storedetcdOptions.StorageConfig.EncodeVersioner runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})etcdOptions.SkipHealthEndpoints true // avoid double wiring of health checksif err : etcdOptions.ApplyTo(genericConfig); err ! nil {return nil, err}// 使用 apiextensions 默认值和注册表覆盖 MergedResourceConfigif err : commandOptions.APIEnablement.ApplyTo(genericConfig,apiextensionsapiserver.DefaultAPIResourceConfigSource(),apiextensionsapiserver.Scheme); err ! nil {return nil, err}// 构建一个生成etcdOptions的工厂crdRESTOptionsGetter, err : apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions)if err ! nil {return nil, err}apiextensionsConfig : apiextensionsapiserver.Config{GenericConfig: genericapiserver.RecommendedConfig{Config: genericConfig,SharedInformerFactory: externalInformers,},ExtraConfig: apiextensionsapiserver.ExtraConfig{CRDRESTOptionsGetter: crdRESTOptionsGetter,MasterCount: masterCount,AuthResolverWrapper: authResolverWrapper,ServiceResolver: serviceResolver,},}// 因为在之前的 CreateKubeAPIServerConfig 函数已经执行过了AddPostStartHook所以我们需要清除 poststarthooks这样我们就不会将它们多次失败时添加到所有服务器.apiextensionsConfig.GenericConfig.PostStartHooks map[string]genericapiserver.PostStartHookConfigEntry{}return apiextensionsConfig, nil
}
3.3.1.3 创建APIExtensionsServer
APIExtensionsServer最先初始化在调用链的末尾, 处理CR、CRD相关资源其中保护的controller功能如下
openapiController将 crd 资源的变化同步至提供的 OpenAPI 文档可通过访问 /openapi/v2 进行查看crdController负责将 crd 信息注册到 apiVersions 和 apiResources 中两者的信息可通过 $ kubectl api-versions 和 $ kubectl api-resources 查看namingController检查 crd obj 中是否有命名冲突可在 crd .status.conditions 中查看establishingController检查 crd 是否处于正常状态可在 crd .status.conditions 中查看nonStructuralSchemaController检查 crd obj 结构是否正常可在 crd .status.conditions 中查看apiApprovalController检查 crd 是否遵循 kubernetes API 声明策略可在 crd .status.conditions 中查看finalizingController类似于 finalizes 的功能与 CRs 的删除有关
// 创建一个APIExtensionsServer,暴露group为apiextensions.k8s.io的api支持crd等操作
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {return apiextensionsConfig.Complete().New(delegateAPIServer)
}//k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {// 初始化 genericServergenericServer, err : c.GenericConfig.New(apiextensions-apiserver, delegationTarget)if err ! nil {return nil, err}// 当 CRD informer server已经完全同步时hasCRDInformerSyncedSignal就会关闭。它可确保在服务器尚未安装所有已知 HTTP 路径时对潜在自定义资源终结点的请求收到 503 错误而不是 404hasCRDInformerSyncedSignal : make(chan struct{})if err : genericServer.RegisterMuxAndDiscoveryCompleteSignal(CRDInformerHasNotSynced, hasCRDInformerSyncedSignal); err ! nil {return nil, err}s : CustomResourceDefinitions{GenericAPIServer: genericServer,}// 初始化apigroup, 即需要暴露的api这里extension apiserver只注册了cr和crd相关的apiResourceConfig : c.GenericConfig.MergedResourceConfigapiGroupInfo : genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)storage : map[string]rest.Storage{}// customresourcedefinitionsif resource : customresourcedefinitions; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {customResourceDefinitionStorage, err : customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)if err ! nil {return nil, err}storage[resource] customResourceDefinitionStoragestorage[resource/status] customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)}if len(storage) 0 {apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] storage}// 注册apigroupif err : s.GenericAPIServer.InstallAPIGroup(apiGroupInfo); err ! nil {return nil, err}// clientset创建crdClient, err : clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)if err ! nil {// its really bad that this is leaking here, but until we can fix the test (which Im pretty sure isnt even testing what it wants to test),// we need to be able to move forwardreturn nil, fmt.Errorf(failed to create clientset: %v, err)}s.Informers externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)delegateHandler : delegationTarget.UnprotectedHandler()if delegateHandler nil {delegateHandler http.NotFoundHandler()}versionDiscoveryHandler : versionDiscoveryHandler{discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},delegate: delegateHandler,}groupDiscoveryHandler : groupDiscoveryHandler{discovery: map[string]*discovery.APIGroupHandler{},delegate: delegateHandler,}establishingController : establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())crdHandler, err : NewCustomResourceDefinitionHandler(versionDiscoveryHandler,groupDiscoveryHandler,s.Informers.Apiextensions().V1().CustomResourceDefinitions(),delegateHandler,c.ExtraConfig.CRDRESTOptionsGetter,c.GenericConfig.AdmissionControl,establishingController,c.ExtraConfig.ServiceResolver,c.ExtraConfig.AuthResolverWrapper,c.ExtraConfig.MasterCount,s.GenericAPIServer.Authorizer,c.GenericConfig.RequestTimeout,time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,apiGroupInfo.StaticOpenAPISpec,c.GenericConfig.MaxRequestBodyBytes,)if err ! nil {return nil, err}s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(/apis, crdHandler)s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix(/apis/, crdHandler)s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)discoveryController : NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, genericServer.AggregatedDiscoveryGroupManager)namingController : status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())nonStructuralSchemaController : nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())apiApprovalController : apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())finalizingController : finalizer.NewCRDFinalizer(s.Informers.Apiextensions().V1().CustomResourceDefinitions(),crdClient.ApiextensionsV1(),crdHandler,)// 将 informer 以及 controller加入到启动hook中s.GenericAPIServer.AddPostStartHookOrDie(start-apiextensions-informers, func(context genericapiserver.PostStartHookContext) error {s.Informers.Start(context.StopCh)return nil})s.GenericAPIServer.AddPostStartHookOrDie(start-apiextensions-controllers, func(context genericapiserver.PostStartHookContext) error {// OpenAPIVersionedService 和 StaticOpenAPISpec 填充在通用 apiserver PrepareRun 中。// 它们一起服务于通用 API 服务器上的 /openapi/v2 endpoint。 // 用 apiserver 可以选择不启用OpenAPI方法是使用空的openAPIConfig从而OpenAPIVersionedService// 和 StaticOpenAPISpec 都是空的。在这种情况下我们不会运行CRD OpenAPI控制器。if s.GenericAPIServer.StaticOpenAPISpec ! nil {if s.GenericAPIServer.OpenAPIVersionedService ! nil {openapiController : openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())go openapiController.Run(s.G enericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)}if s.GenericAPIServer.OpenAPIV3VersionedService ! nil utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {openapiv3Controller : openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)}}go namingController.Run(context.StopCh)go establishingController.Run(context.StopCh)go nonStructuralSchemaController.Run(5, context.StopCh)go apiApprovalController.Run(5, context.StopCh)go finalizingController.Run(5, context.StopCh)discoverySyncedCh : make(chan struct{})go discoveryController.Run(context.StopCh, discoverySyncedCh)select {case -context.StopCh:case -discoverySyncedCh:}return nil})// 在我们可以处理所有已注册的 CRD 之前我们不想报告健康检查状况。等到informer sync确保lister 在开始前都是有效的。// 启动后可能还会有一段时间去添加CRDS。除非否则不会进行健康检查// we dont want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer// to sync makes sure that the lister will be valid before we begin. There may still be races for CRDs added after startup,// but we wont go healthy until we can handle the ones already present.s.GenericAPIServer.AddPostStartHookOrDie(crd-informer-synced, func(context genericapiserver.PostStartHookContext) error {return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {close(hasCRDInformerSyncedSignal)return true, nil}return false, nil}, context.StopCh)})return s, nil
}3.3.1.4 创建kubeAPIServer及其初始化
KubeAPIServer 主要是提供对 API Resource 的操作请求为 kubernetes 中众多 API 注册路由信息暴露 RESTful API 并且对外提供 kubernetes service使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。
KubeAPIServer初始化流程如下
调用 c.GenericConfig.New 初始化 GenericAPIServer判断是否支持 logs 相关的路由如果支持则添加 /logs 路由调用 m.InstallLegacyAPI 将核心 API Resource 添加到路由中对应到 apiserver 就是以 /api 开头的 resource调用 m.InstallAPIs 将扩展的 API Resource 添加到路由中在 apiserver 中即是以 /apis 开头的 resource
// CreateKubeAPIServer创建kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {kubeAPIServer, err : kubeAPIServerConfig.Complete().New(delegateAPIServer)if err ! nil {return nil, err}return kubeAPIServer, nil
}// New 从给定的配置返回一个新的 Master 实例。如果未设置某些配置字段将设置为默认值。
// 必须指定某些配置字段包括KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {return nil, fmt.Errorf(Master.New() called with empty config.KubeletClientConfig)}s, err : c.GenericConfig.New(kube-apiserver, delegationTarget)if err ! nil {return nil, err}// 注册 logs 相关的路由if c.ExtraConfig.EnableLogsSupport {routes.Logs{}.Install(s.Handler.GoRestfulContainer)}// 元数据和key目前预计只会在重新启动后更改// 因此我们只需立即封送并提供缓存的 JSON 字节。s.md, err : serviceaccount.NewOpenIDMetadata(c.ExtraConfig.ServiceAccountIssuerURL,c.ExtraConfig.ServiceAccountJWKSURI,c.GenericConfig.ExternalAddress,c.ExtraConfig.ServiceAccountPublicKeys,)if err ! nil {// 如果出现错误请跳过安装endpoints 并记录错误请继续。我们不会返回错误因为// 元数据响应需要额外的向后进行不兼容命令行选项的验证。msg : fmt.Sprintf(Could not construct pre-rendered responses for ServiceAccountIssuerDiscovery endpoints. Endpoints will not be enabled. Error: %v, err)if c.ExtraConfig.ServiceAccountIssuerURL ! {// The user likely expects this feature to be enabled if issuer URL is// set and the feature gate is enabled. In the future, if there is no// longer a feature gate and issuer URL is not set, the user may not// expect this feature to be enabled. We log the former case as an Error// and the latter case as an Info.klog.Error(msg)} else {klog.Info(msg)}} else {routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).Install(s.Handler.GoRestfulContainer)}m : Instance{GenericAPIServer: s,ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,}// 安装 LegacyAPIif err : m.InstallLegacyAPI(c, c.GenericConfig.RESTOptionsGetter); err ! nil {return nil, err}clientset, err : kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig)if err ! nil {return nil, err}// TODO: update to a version that caches success but will recheck on failure, unlike memcache discoverydiscoveryClientForAdmissionRegistration : clientset.Discovery()// The order here is preserved in discovery.// If resources with identical names exist in more than one of these groups (e.g. deployments.apps and deployments.extensions),// the order of this list determines which group an unqualified resource name (e.g. deployments) should prefer.// This priority order is used for local discovery, but it ends up aggregated in k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go// with specific priorities.// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery// handlers that we have.restStorageProviders : []RESTStorageProvider{apiserverinternalrest.StorageProvider{},authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},autoscalingrest.RESTStorageProvider{},batchrest.RESTStorageProvider{},certificatesrest.RESTStorageProvider{},coordinationrest.RESTStorageProvider{},discoveryrest.StorageProvider{},networkingrest.RESTStorageProvider{},noderest.RESTStorageProvider{},policyrest.RESTStorageProvider{},rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},schedulingrest.RESTStorageProvider{},storagerest.RESTStorageProvider{},flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.// See https://github.com/kubernetes/kubernetes/issues/42392appsrest.StorageProvider{},admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},resourcerest.RESTStorageProvider{},}// 只能APIif err : m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err ! nil {return nil, err}m.GenericAPIServer.AddPostStartHookOrDie(start-cluster-authentication-info-controller, func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err : kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err ! nil {return err}controller : clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver// TODO: See if we can pass ctx to the current methodctx, cancel : context.WithCancel(context.Background())go func() {select {case -hookContext.StopCh:cancel() // stopCh closed, so cancel our contextcase -ctx.Done():}}()// prime values and start listenersif m.ClusterAuthenticationInfo.ClientCA ! nil {m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)if controller, ok : m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {// runonce to be sure that we have a value.if err : controller.RunOnce(ctx); err ! nil {runtime.HandleError(err)}go controller.Run(ctx, 1)}}if m.ClusterAuthenticationInfo.RequestHeaderCA ! nil {m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)if controller, ok : m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {// runonce to be sure that we have a value.if err : controller.RunOnce(ctx); err ! nil {runtime.HandleError(err)}go controller.Run(ctx, 1)}}go controller.Run(ctx, 1)return nil})if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {m.GenericAPIServer.AddPostStartHookOrDie(start-kube-apiserver-identity-lease-controller, func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err : kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err ! nil {return err}leaseName : m.GenericAPIServer.APIServerIDholderIdentity : m.GenericAPIServer.APIServerID _ string(uuid.NewUUID())controller : lease.NewController(clock.RealClock{},kubeClient,holderIdentity,int32(IdentityLeaseDurationSeconds),nil,IdentityLeaseRenewIntervalPeriod,leaseName,metav1.NamespaceSystem,labelAPIServerHeartbeat)go controller.Run(hookContext.StopCh)return nil})m.GenericAPIServer.AddPostStartHookOrDie(start-kube-apiserver-identity-lease-garbage-collector, func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err : kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err ! nil {return err}go apiserverleasegc.NewAPIServerLeaseGC(kubeClient,IdentityLeaseGCPeriod,metav1.NamespaceSystem,KubeAPIServerIdentityLeaseLabelSelector,).Run(hookContext.StopCh)return nil})}m.GenericAPIServer.AddPostStartHookOrDie(start-legacy-token-tracking-controller, func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err : kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err ! nil {return err}go legacytokentracking.NewController(kubeClient).Run(hookContext.StopCh)return nil})return m, nil
}
3.3.1.4.1 m.InstallLegacyAPI
功能 将 core API 注册到路由中是 apiserver 初始化流程中最核心的方法之一其将 API 注册到路由其最终的目的就是对外提供 RESTful API 来操作对应 resource注册 API 主要分为两步第一步是为 API 中的每个 resource 初始化 RESTStorage 以此操作后端存储中数据的变更第二步是为每个 resource 根据其 verbs 构建对应的路由。m.InstallLegacyAPI 的主要逻辑为
1、调用 legacyRESTStorageProvider.NewLegacyRESTStorage 为 LegacyAPI 中各个资源创建 RESTStorageRESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来2、初始化 bootstrap-controller并将其加入到 PostStartHook 中bootstrap-controller 是 apiserver 中的一个 controller主要功能是创建系统所需要的一些 namespace 以及创建 kubernetes service 并定期触发对应的 sync 操作apiserver 在启动后会通过调用 PostStartHook 来启动 bootstrap-controller3、在为资源创建完 RESTStorage 后调用 m.GenericAPIServer.InstallLegacyAPIGroup 为 APIGroup 注册路由信息InstallLegacyAPIGroup方法的调用链非常深主要为InstallLegacyAPIGroup-- installAPIResources -- InstallREST -- Install -- registerResourceHandlers最终核心的路由构造在registerResourceHandlers方法内该方法比较复杂其主要功能是通过上一步骤构造的 REST Storage 判断该资源可以执行哪些操作如 create、update等将其对应的操作存入到 action 中每一个 action 对应一个标准的 REST 操作如 create 对应的 action 操作为 POST、update 对应的 action 操作为PUT。最终根据 actions 数组依次遍历对每一个操作添加一个 handler 方法注册到 route 中去再将 route 注册到 webservice 中去webservice 最终会注册到 container 中遵循 go-restful 的设计模式func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {legacyRESTStorageProvider : corerest.LegacyRESTStorageProvider{StorageFactory: c.ExtraConfig.StorageFactory,ProxyTransport: c.ExtraConfig.ProxyTransport,KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,EventTTL: c.ExtraConfig.EventTTL,ServiceIPRange: c.ExtraConfig.ServiceIPRange,SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,ExtendExpiration: c.ExtraConfig.ExtendExpiration,ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,APIAudiences: c.GenericConfig.Authentication.APIAudiences,}legacyRESTStorage, apiGroupInfo, err : legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)if err ! nil {return fmt.Errorf(error building core storage: %v, err)}if len(apiGroupInfo.VersionedResourcesStorageMap) 0 { // if all core storage is disabled, return.return nil}controllerName : bootstrap-controllerclient : kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)bootstrapController, err : c.NewBootstrapController(legacyRESTStorage, client)if err ! nil {return fmt.Errorf(error creating bootstrap controller: %v, err)}m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)if err : m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, apiGroupInfo); err ! nil {return fmt.Errorf(error in registering group versions: %v, err)}return nil
}3.3.1.4.2 m.InstallAPIs
installAPIS和InstallLegacyAPI的调用流程类似
3.3.1.5 创建AggregatorConfig
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config,commandOptions *options.ServerRunOptions,externalInformers kubeexternalinformers.SharedInformerFactory,serviceResolver aggregatorapiserver.ServiceResolver,proxyTransport *http.Transport,pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {// 浅拷贝kubeAPIServerConfig修改其中部分配置genericConfig.PostStartHooks map[string]genericapiserver.PostStartHookConfigEntry{}genericConfig.RESTOptionsGetter nil// 阻止通用 API 服务器安装 OpenAPI 处理程序。Aggregator server有自己的自定义 OpenAPI 处理程序。genericConfig.SkipOpenAPIInstallation trueif utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {// Add StorageVersionPrecondition handler to aggregator-apiserver.// The handler will block write requests to built-in resources until the// target resources storage versions are up-to-date.genericConfig.BuildHandlerChainFunc genericapiserver.BuildHandlerChainWithStorageVersionPrecondition}// 拷贝etcd参数etcdOptions : *commandOptions.EtcdetcdOptions.StorageConfig.Paging utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIListChunking)etcdOptions.StorageConfig.Codec aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)etcdOptions.StorageConfig.EncodeVersioner runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})etcdOptions.SkipHealthEndpoints true // avoid double wiring of health checksif err : etcdOptions.ApplyTo(genericConfig); err ! nil {return nil, err}// 使用aggregator默认值和注册表覆盖MergedResourceConfig配置if err : commandOptions.APIEnablement.ApplyTo(genericConfig,aggregatorapiserver.DefaultAPIResourceConfigSource(),aggregatorscheme.Scheme); err ! nil {return nil, err}// etcd参数配置aggregatorConfig : aggregatorapiserver.Config{GenericConfig: genericapiserver.RecommendedConfig{Config: genericConfig,SharedInformerFactory: externalInformers,},ExtraConfig: aggregatorapiserver.ExtraConfig{ProxyClientCertFile: commandOptions.ProxyClientCertFile,ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,ServiceResolver: serviceResolver,ProxyTransport: proxyTransport,RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,},}// 需要清除 poststarthook这样我们就不会将它们多次添加到所有server失败后aggregatorConfig.GenericConfig.PostStartHooks map[string]genericapiserver.PostStartHookConfigEntry{}return aggregatorConfig, nil
}
3.3.1.6 创建 AggregatorServer
Aggregator通过APIServices对象关联到某个Service来进行请求的转发其关联的Service类型进一步决定了请求转发形式。Aggregator包括一个GenericAPIServer和维护自身状态的Controller。其中 GenericAPIServer主要处理apiregistration.k8s.io组下的APIService资源请求。
主要实现逻辑
调用 aggregatorConfig.Complete().NewWithDelegate 创建 aggregatorServer初始化 crdRegistrationController 和 autoRegistrationControllercrdRegistrationController 负责注册 CRDautoRegistrationController 负责将 CRD 对应的 APIServices 自动注册到 apiserver 中CRD 创建后可通过 $ kubectl get apiservices 查看是否注册到 apiservices 中将 autoRegistrationController 和 crdRegistrationController 加入到 PostStartHook 中
Aggregator除了处理资源请求外还包含几个controller
apiserviceRegistrationController负责APIServices中资源的注册与删除availableConditionController维护APIServices的可用状态包括其引用Service是否可用等autoRegistrationController用于保持API中存在的一组特定的APIServicescrdRegistrationController负责将CRD GroupVersions自动注册到APIServices中openAPIAggregationController将APIServices资源的变化同步至提供的OpenAPI文档 kubernetes中的一些附加组件比如metrics-server就是通过 Aggregator的方式进行扩展的实际环境中可以通过使用apiserver-builder工具轻松以Aggregator的扩展方式创建自定义资源。
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {// 初始化aggregatorServeraggregatorServer, err : aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)if err ! nil {return nil, err}// 创建并初始化 auto-registrationapiRegistrationClient, err : apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)if err ! nil {return nil, err}autoRegistrationController : autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)apiServices : apiServicesToRegister(delegateAPIServer, autoRegistrationController)crdRegistrationController : crdregistration.NewCRDRegistrationController(apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),autoRegistrationController)// Imbue all builtin group-priorities onto the aggregated discoveryif aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager ! nil {for gv, entry : range apiVersionPriorities {aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))}}err aggregatorServer.GenericAPIServer.AddPostStartHook(kube-apiserver-autoregistration, func(context genericapiserver.PostStartHookContext) error {go crdRegistrationController.Run(5, context.StopCh)go func() {// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.// this prevents the autoregistration controllers initial sync from deleting APIServices for CRDs that still exist.// we only need to do this if CRDs are enabled on this server. We cant use discovery because we are the source for discovery.if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource(customresourcedefinitions)) {crdRegistrationController.WaitForInitialSync()}autoRegistrationController.Run(5, context.StopCh)}()return nil})if err ! nil {return nil, err}// 添加健康检查err aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(makeAPIServiceAvailableHealthCheck(autoregister-completion,apiServices,aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),),)if err ! nil {return nil, err}return aggregatorServer, nil
}3.3.1.6.1 aggregatorConfig.Complete().NewWithDelegate
aggregatorConfig.Complete().NewWithDelegate 是初始化 aggregatorServer 的方法主要逻辑为 调用 c.GenericConfig.New 初始化 GenericAPIServer 调用 apiservicerest.NewRESTStorage 为 APIServices 资源创建 RESTStorageRESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来 调用 s.GenericAPIServer.InstallAPIGroup 为 APIGroup 注册路由信息
// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {// 初始化genericServergenericServer, err : c.GenericConfig.New(kube-aggregator, delegationTarget)if err ! nil {return nil, err}apiregistrationClient, err : clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)if err ! nil {return nil, err}informerFactory : informers.NewSharedInformerFactory(apiregistrationClient,5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.)// apiServiceRegistrationControllerInitiated 在 APIServiceRegistrationController 完成“installing”所有已知 API 服务时关闭。// 此时我们知道代理proxy APIServices 可以处理客户端请求。在它可能导致 404 响应出现之前这可能会对某些控制器如 GC 和 NS产生严重后果// APIServiceRegistrationController 在执行其工作之前会等待 APIServiceInformer 同步。apiServiceRegistrationControllerInitiated : make(chan struct{})if err : genericServer.RegisterMuxAndDiscoveryCompleteSignal(APIServiceRegistrationControllerInitiated, apiServiceRegistrationControllerInitiated); err ! nil {return nil, err}s : APIAggregator{GenericAPIServer: genericServer,delegateHandler: delegationTarget.UnprotectedHandler(),proxyTransport: c.ExtraConfig.ProxyTransport,proxyHandlers: map[string]*proxyHandler{},handledGroups: sets.String{},lister: informerFactory.Apiregistration().V1().APIServices().Lister(),APIRegistrationInformers: informerFactory,serviceResolver: c.ExtraConfig.ServiceResolver,openAPIConfig: c.GenericConfig.OpenAPIConfig,openAPIV3Config: c.GenericConfig.OpenAPIV3Config,egressSelector: c.GenericConfig.EgressSelector,proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,}// 稍后过滤已过期的资源resourceExpirationEvaluator, err : genericapiserver.NewResourceExpirationEvaluator(*c.GenericConfig.Version)if err ! nil {return nil, err}// 为API注册路由apiGroupInfo : apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))if err : s.GenericAPIServer.InstallAPIGroup(apiGroupInfo); err ! nil {return nil, err}enabledVersions : sets.NewString()for v : range apiGroupInfo.VersionedResourcesStorageMap {enabledVersions.Insert(v)}if !enabledVersions.Has(v1.SchemeGroupVersion.Version) {return nil, fmt.Errorf(API group/version %s must be enabled, v1.SchemeGroupVersion.String())}// 初始化 apiserviceRegistrationController、availableControllerapisHandler : apisHandler{codecs: aggregatorscheme.Codecs,lister: s.lister,discoveryGroup: discoveryGroup(enabledVersions),}if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {apisHandlerWithAggregationSupport : aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager)s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(/apis, apisHandlerWithAggregationSupport)} else {s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(/apis, apisHandler)}s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(/apis/, apisHandler)apiserviceRegistrationController : NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)if len(c.ExtraConfig.ProxyClientCertFile) 0 len(c.ExtraConfig.ProxyClientKeyFile) 0 {aggregatorProxyCerts, err : dynamiccertificates.NewDynamicServingContentFromFiles(aggregator-proxy-cert, c.ExtraConfig.ProxyClientCertFile, c.ExtraConfig.ProxyClientKeyFile)if err ! nil {return nil, err}// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the// context is not used at all. So passing a empty context shouldnt be a problemctx : context.TODO()if err : aggregatorProxyCerts.RunOnce(ctx); err ! nil {return nil, err}aggregatorProxyCerts.AddListener(apiserviceRegistrationController)s.proxyCurrentCertKeyContent aggregatorProxyCerts.CurrentCertKeyContents.GenericAPIServer.AddPostStartHookOrDie(aggregator-reload-proxy-client-cert, func(postStartHookContext genericapiserver.PostStartHookContext) error {// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver// TODO: See if we can pass ctx to the current methodctx, cancel : context.WithCancel(context.Background())go func() {select {case -postStartHookContext.StopCh:cancel() // stopCh closed, so cancel our contextcase -ctx.Done():}}()go aggregatorProxyCerts.Run(ctx, 1)return nil})}availableController, err : statuscontrollers.NewAvailableConditionController(informerFactory.Apiregistration().V1().APIServices(),c.GenericConfig.SharedInformerFactory.Core().V1().Services(),c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),apiregistrationClient.ApiregistrationV1(),c.ExtraConfig.ProxyTransport,(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),s.serviceResolver,c.GenericConfig.EgressSelector,)if err ! nil {return nil, err}// 添加 PostStartHooks.GenericAPIServer.AddPostStartHookOrDie(start-kube-aggregator-informers, func(context genericapiserver.PostStartHookContext) error {informerFactory.Start(context.StopCh)c.GenericConfig.SharedInformerFactory.Start(context.StopCh)return nil})s.GenericAPIServer.AddPostStartHookOrDie(apiservice-registration-controller, func(context genericapiserver.PostStartHookContext) error {go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)select {case -context.StopCh:case -apiServiceRegistrationControllerInitiated:}return nil})s.GenericAPIServer.AddPostStartHookOrDie(apiservice-status-available-controller, func(context genericapiserver.PostStartHookContext) error {// if we end up blocking for long periods of time, we may need to increase workers.go availableController.Run(5, context.StopCh)return nil})if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {// Spawn a goroutine in aggregator apiserver to update storage version for// all built-in resourcess.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {// Wait for apiserver-identity to exist first before updating storage// versions, to avoid storage version GC accidentally garbage-collecting// storage versions.kubeClient, err : kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err ! nil {return err}if err : wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {_, err : kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})if apierrors.IsNotFound(err) {return false, nil}if err ! nil {return false, err}return true, nil}, hookContext.StopCh); err ! nil {return fmt.Errorf(failed to wait for apiserver-identity lease %s to be created: %v,s.GenericAPIServer.APIServerID, err)}// Technically an apiserver only needs to update storage version once during bootstrap.// Reconcile StorageVersion objects every 10 minutes will help in the case that the// StorageVersion objects get accidentally modified/deleted by a different agent. In that// case, the reconciliation ensures future storage migration still works. If nothing gets// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,// therefore wont change the resource version and trigger storage migration.go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)// share the same generic apiserver config. The same StorageVersion manager is used// to register all built-in resources when the generic apiservers install APIs.s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)return false, nil}, hookContext.StopCh)// Once the storage version updater finishes the first round of update,// the PostStartHook will return to unblock /healthz. The handler chain// wont block write requests anymore. Check every second since its not// expensive.wait.PollImmediateUntil(1*time.Second, func() (bool, error) {return s.GenericAPIServer.StorageVersionManager.Completed(), nil}, hookContext.StopCh)return nil})}return s, nil
}基本调用链如下 |-- CreateNodeDialer||-- CreateKubeAPIServerConfig|
CreateServerChain --|-- createAPIExtensionsConfig|| |-- c.GenericConfig.New|-- createAPIExtensionsServer -- apiextensionsConfig.Complete().New --|| |-- s.GenericAPIServer.InstallAPIGroup|| |-- c.GenericConfig.New -- legacyRESTStorageProvider.NewLegacyRESTStorage| ||-- CreateKubeAPIServer -- kubeAPIServerConfig.Complete().New --|-- m.InstallLegacyAPI| || |-- m.InstallAPIs|||-- createAggregatorConfig|| |-- c.GenericConfig.New| ||-- createAggregatorServer -- aggregatorConfig.Complete().NewWithDelegate --|-- apiservicerest.NewRESTStorage||-- s.GenericAPIServer.InstallAPIGroup3.3.2 prepared.Run
在 Run 方法中首先调用 CreateServerChain 完成各 server 的初始化然后调用 server.PrepareRun 完成服务启动前的准备工作最后调用 prepared.Run 方法来启动安全的 http server。server.PrepareRun 主要完成了健康检查、存活检查和OpenAPI路由的注册工作
// PrepareRun 通过设置 OpenAPI spec和aggregated discovery document 来准备aggregator 运行。
聚合发现文档并调用通用 PrepareRun。
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {// add post start hook before generic PrepareRun in order to be before /healthz installationif s.openAPIConfig ! nil {s.GenericAPIServer.AddPostStartHookOrDie(apiservice-openapi-controller, func(context genericapiserver.PostStartHookContext) error {go s.openAPIAggregationController.Run(context.StopCh)return nil})}if s.openAPIV3Config ! nil utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {s.GenericAPIServer.AddPostStartHookOrDie(apiservice-openapiv3-controller, func(context genericapiserver.PostStartHookContext) error {go s.openAPIV3AggregationController.Run(context.StopCh)return nil})}if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {s.discoveryAggregationController NewDiscoveryManager(s.GenericAPIServer.AggregatedDiscoveryGroupManager,)// 启动discovery 端点s.GenericAPIServer.AddPostStartHookOrDie(apiservice-discovery-controller, func(context genericapiserver.PostStartHookContext) error {// Run discovery managers worker to watch for new/removed/updated// APIServices to the discovery document can be updated at runtimego s.discoveryAggregationController.Run(context.StopCh)return nil})}prepared : s.GenericAPIServer.PrepareRun()// 延迟OpenAPI直到delegate启动他们的handlers启动后再启动if s.openAPIConfig ! nil {specDownloader : openapiaggregator.NewDownloader()openAPIAggregator, err : openapiaggregator.BuildAndRegisterAggregator(specDownloader,s.GenericAPIServer.NextDelegate(),s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),s.openAPIConfig,s.GenericAPIServer.Handler.NonGoRestfulMux)if err ! nil {return preparedAPIAggregator{}, err}s.openAPIAggregationController openapicontroller.NewAggregationController(specDownloader, openAPIAggregator)}if s.openAPIV3Config ! nil utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {specDownloaderV3 : openapiv3aggregator.NewDownloader()openAPIV3Aggregator, err : openapiv3aggregator.BuildAndRegisterAggregator(specDownloaderV3,s.GenericAPIServer.NextDelegate(),s.GenericAPIServer.Handler.NonGoRestfulMux)if err ! nil {return preparedAPIAggregator{}, err}s.openAPIV3AggregationController openapiv3controller.NewAggregationController(openAPIV3Aggregator)}return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}func (s preparedAPIAggregator) Run(stopCh -chan struct{}) error {return s.runnable.Run(stopCh)
}