KubeSphere DevOps代码阅读笔记

Posted by 爱折腾的工程师 on Tuesday, September 28, 2021

1. KubeSphere DevOps简介

KubeSphere DevOps提供基于Jenkins的CI/CD流水线,支持自动化工作流,包括Binary-to-Image (B2I)和Source-to-Image(S2I)等, 帮助不同的组织加快产品上市时间。

  • 集成Jenkins:KubeSphere DevOps 系统内置了 Jenkins 作为引擎,支持多种第三方插件。 此外,Jenkins 为扩展开发提供了良好的环境,DevOps 团队的整个工作流程可以在统一的平台上无缝对接,包括开发测试、构建部署、监控日志和通知等。 KubeSphere的帐户可以用登录内置的Jenkins,满足企业对于 CI/CD 流水线和统一认证多租户隔离的需求。

  • 便捷的内置工具:无需对Docker或Kubernetes的底层运作原理有深刻的了解,用户即可快速上手自动化工具,包括Binary-to-Image和Source-to-Image。 只需定义镜像仓库地址,上传二进制文件(例如JAR/WAR/Binary),即可将对应的服务自动发布至Kubernetes,无需编写Dockerfile。

2. KubeSphere DevOps使用方式

2.1 创建流水线

支持Jenkinsfile和图形化界面两种方式构建流水线:

2.2 工具集成

3. KubeSphere架构

3.1 系统架构

KubeSphere将前端与后端分开,实现了面向云原生的设计,后端的各个功能组件可通过 REST API 对接外部系统。 可参考API文档。下图是系统架构图。KubeSphere无底层的基础设施依赖,可以运行在任何Kubernetes、私有云、公有云、VM或物理环境(BM)之上。此外,它可以部署在任何Kubernetes发行版上。

组件列表:

后端组件 功能说明
ks-apiserver 整个集群管理的 API 接口和集群内部各个模块之间通信的枢纽,以及集群安全控制。
ks-console 提供 KubeSphere 的控制台服务。
ks-controller-manager 实现业务逻辑的,例如创建企业空间时,为其创建对应的权限;或创建服务策略时,生成对应的 Istio 配置等。
metrics-server Kubernetes 的监控组件,从每个节点的 Kubelet 采集指标信息。
Prometheus 提供集群,节点,工作负载,API对象的监视指标和服务。
Elasticsearch 提供集群的日志索引、查询、数据管理等服务,在安装时也可对接您已有的 ES 减少资源消耗。
Fluent Bit 提供日志接收与转发,可将采集到的⽇志信息发送到 ElasticSearch、Kafka。
Jenkins 提供 CI/CD 流水线服务。
SonarQube 可选安装项,提供代码静态检查与质量分析。
Source-to-Image 将源代码自动将编译并打包成 Docker 镜像,方便快速构建镜像。
Istio 提供微服务治理与流量管控,如灰度发布、金丝雀发布、熔断、流量镜像等。
Jaeger 收集 Sidecar 数据,提供分布式 Tracing 服务。
OpenPitrix 提供应用程序生命周期管理,例如应用模板、应用部署与管理的服务等。
Alert 提供集群、Workload、Pod、容器级别的自定义告警服务。
Notification 是一项综合通知服务; 它当前支持邮件传递方法。
Redis 将 ks-console 与 ks-account 的数据存储在内存中的存储系统。
MySQL 集群后端组件的数据库,监控、告警、DevOps、OpenPitrix 共用 MySQL 服务。
PostgreSQL SonarQube 和 Harbor 的后端数据库。
OpenLDAP 负责集中存储和管理用户帐户信息与对接外部的 LDAP。
Storage 内置 CSI 插件对接云平台存储服务,可选安装开源的 NFS/Ceph/Gluster 的客户端。
Network 可选安装 Calico/Flannel 等开源的网络插件,支持对接云平台 SDN。

跟DevOps相关的组件有:Jenkins、SonarQube、Source-to-Image、Storage(对象存储存放CI制品)

3.2 API架构

KubeSphere API服务器为API对象验证和配置数据。API服务器为REST操作提供服务,并为集群的共享状态提供前端,其他所有组件通过它进行交互。 其中/kapi和/kapis是KubeSphere拓展聚合的API,/api和/apis开头的都属于Kubernetes原生的API, KubeSphere把用户对原生Kubernetes资源的请求通过API Server转发到Kubernetes API Server对原生资源进行操作和管理。

DevOps API也是以/kapi和/kapis开头,ks-apiserver是API入口,如果发现是DevOps API请求,也会相继经过认证、鉴权、审计; 通过API路由转发到后端的devops-apiserver,进而操作对应的DevOps CR资源,devops-controller监听对应的DevOps CR资源事件,触发对应的处理逻辑(操作jenkins)

4. 代码分析

KubeSphere版本号:735f0c77310fec0b9a17550844ad098f2ef387d5(commit-id)

4.1 ks-apiserver

ks-devops-apiserver目前并不是独立的apiserver,所有请求入口还是在ks-apiserver,所以先来看下ks-apiserver

APIServer结构体

type APIServer struct {
    // number of kubesphere apiserver
    //kubersphere apiserver运行数量
    ServerCount int
    //http server运行监听地址,端口之类
    Server *http.Server
    //kubesphere各组件适配器运行参数
    Config *apiserverconfig.Config

    // webservice container, where all webservice defines
    //go-restful web API框架中的container结构体,其它所有的web路由服务都放在container中
    container *restful.Container

    // kubeClient is a collection of all kubernetes(include CRDs) objects clientset
    //聚合型的ClientSet接口实现
    KubernetesClient k8s.Client

    // informerFactory is a collection of all kubernetes(include CRDs) objects informers,
    // mainly for fast query
    //聚合类型的informer工厂接口实现
    InformerFactory informers.InformerFactory

    // cache is used for short lived objects, like session
    //缓存Interface接口实现
    CacheClient cache.Interface

    // monitoring client set
    //monitoring Interface接口实现
    MonitoringClient monitoring.Interface
    //monitoring Interface接口实现
    MetricsClient monitoring.Interface
    //logging Client接口实现
    LoggingClient logging.Client
    //devops Interface接口实现
    DevopsClient devops.Interface
    //s3 Interface接口实现
    S3Client s3.Interface
    //sonarqube SonarInterface接口实现
    SonarClient sonarqube.SonarInterface
    //events Client接口实现
    EventsClient events.Client
    //auditing Client接口实现
    AuditingClient auditing.Client
    //alerting RuleClient接口实现
    AlertingClient alerting.RuleClient

    // controller-runtime cache
    //controller-runtime Cache接口实现
    RuntimeCache runtimecache.Cache

    // entity that issues tokens
    //token Issuer接口实现
    Issuer token.Issuer

    // controller-runtime client
    //controller-runtime Client接口实现
    RuntimeClient runtimeclient.Client
}

ServerRunOptions结构体

api-server启动加载的运行参数对应的结构体

type ServerRunOptions struct {
    //配置文件
    ConfigFile              string
    //api-server启动监听的地址,secure/insecure端口以及tls证书
    GenericServerRunOptions *genericoptions.ServerRunOptions
    //各个组件模块的配置参数
    *apiserverconfig.Config

    //调试模式
    DebugMode bool
}

Config结构体

汇聚各个组件适配器模块的配置参数,如DevOps、SonarQube、ServiceMesh、OpenPitrix等

// Config defines everything needed for apiserver to deal with external services
type Config struct {
    //DevOps适配器配置参数
    DevopsOptions         *jenkins.Options        `json:"devops,omitempty" yaml:"devops,omitempty" mapstructure:"devops"`
    //SonarQube适配器配置参数
    SonarQubeOptions      *sonarqube.Options      `json:"sonarqube,omitempty" yaml:"sonarQube,omitempty" mapstructure:"sonarqube"`
    //Kubernetes适配器配置参数
    KubernetesOptions     *k8s.KubernetesOptions  `json:"kubernetes,omitempty" yaml:"kubernetes,omitempty" mapstructure:"kubernetes"`
    //ServiceMesh适配器配置参数
    ServiceMeshOptions    *servicemesh.Options    `json:"servicemesh,omitempty" yaml:"servicemesh,omitempty" mapstructure:"servicemesh"`
    //Network适配器配置参数
    NetworkOptions        *network.Options        `json:"network,omitempty" yaml:"network,omitempty" mapstructure:"network"`
    //Ldap适配器配置参数
    LdapOptions           *ldap.Options           `json:"-,omitempty" yaml:"ldap,omitempty" mapstructure:"ldap"`
    //Redis适配器配置参数
    RedisOptions          *cache.Options          `json:"redis,omitempty" yaml:"redis,omitempty" mapstructure:"redis"`
    //S3适配器配置参数
    S3Options             *s3.Options             `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"`
    //OpenPitrix适配器配置参数
    OpenPitrixOptions     *openpitrix.Options     `json:"openpitrix,omitempty" yaml:"openpitrix,omitempty" mapstructure:"openpitrix"`
    //Monitoring适配器配置参数
    MonitoringOptions     *prometheus.Options     `json:"monitoring,omitempty" yaml:"monitoring,omitempty" mapstructure:"monitoring"`
    //Logging适配器配置参数
    LoggingOptions        *logging.Options        `json:"logging,omitempty" yaml:"logging,omitempty" mapstructure:"logging"`
    //Authentication适配器配置参数
    AuthenticationOptions *authentication.Options `json:"authentication,omitempty" yaml:"authentication,omitempty" mapstructure:"authentication"`
    //Authorization适配器配置参数
    AuthorizationOptions  *authorization.Options  `json:"authorization,omitempty" yaml:"authorization,omitempty" mapstructure:"authorization"`
    //MultiCluster适配器配置参数
    MultiClusterOptions   *multicluster.Options   `json:"multicluster,omitempty" yaml:"multicluster,omitempty" mapstructure:"multicluster"`
    //Events适配器配置参数
    EventsOptions         *events.Options         `json:"events,omitempty" yaml:"events,omitempty" mapstructure:"events"`
    //Auditing适配器配置参数
    AuditingOptions       *auditing.Options       `json:"auditing,omitempty" yaml:"auditing,omitempty" mapstructure:"auditing"`
    //Alerting适配器配置参数
    AlertingOptions       *alerting.Options       `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"`
    //Notification适配器配置参数
    NotificationOptions   *notification.Options   `json:"notification,omitempty" yaml:"notification,omitempty" mapstructure:"notification"`
    //KubeEdge适配器配置参数
    KubeEdgeOptions       *kubeedge.Options       `json:"kubeedge,omitempty" yaml:"kubeedge,omitempty" mapstructure:"kubeedge"`
    //Metering适配器配置参数
    MeteringOptions       *metering.Options       `json:"metering,omitempty" yaml:"metering,omitempty" mapstructure:"metering"`
    //Gateway适配器配置参数
    GatewayOptions        *gateway.Options        `json:"gateway,omitempty" yaml:"gateway,omitempty" mapstructure:"gateway"`
    //GPU适配器配置参数
    GPUOptions            *gpu.Options            `json:"gpu,omitempty" yaml:"gpu,omitempty" mapstructure:"gpu"`
}

ServerRunOptions结构体

type ServerRunOptions struct {
    //配置文件
    ConfigFile              string
    api-server本身运行相关参数
    GenericServerRunOptions *genericoptions.ServerRunOptions
    //各个组件适配器模块的配置参数
    *apiserverconfig.Config

    //调试模式
    DebugMode bool
}

api-server本身运行相关参数

type ServerRunOptions struct {
    //server监听地址
    BindAddress string
    //监听非安全端口
    InsecurePort int
    //监听安全端口
    SecurePort int
    //tls cert文件
    TlsCertFile string
    //tls私钥文件
    TlsPrivateKey string
}

Client接口

实现Client接口就能成为一个聚合型的ClientSet

type Client interface {
    //原生的k8s ClientSet
    Kubernetes() kubernetes.Interface
    //KubeSphere ClientSet
    KubeSphere() kubesphere.Interface
    //istio ClientSet
    Istio() istioclient.Interface
    //csi snapshot ClientSet
    Snapshot() snapshotclient.Interface
    //ApiExtensions ClientSet
    ApiExtensions() apiextensionsclient.Interface
    //原生的discoveryClient
    Discovery() discovery.DiscoveryInterface
    //Prometheus ClientSet
    Prometheus() promresourcesclient.Interface
    //K8s master地址
    Master() string
    //ClientSet配置参数
    Config() *rest.Config
}

InformerFactory接口

聚合类型的informer工厂接口,内部也是各类组件的共享informer工厂接口

// InformerFactory is a group all shared informer factories which kubesphere needed
// callers should check if the return value is nil
type InformerFactory interface {
    //k8s原生共享informer工厂
    KubernetesSharedInformerFactory() k8sinformers.SharedInformerFactory
    //kuberSphere共享informer工厂
    KubeSphereSharedInformerFactory() ksinformers.SharedInformerFactory
    //istio共享informer工厂
    IstioSharedInformerFactory() istioinformers.SharedInformerFactory
    //csi snapshot共享informer工厂
    SnapshotSharedInformerFactory() snapshotinformer.SharedInformerFactory
    //apiextension共享informer工厂
    ApiExtensionSharedInformerFactory() apiextensionsinformers.SharedInformerFactory
    //prometheus共享informer工厂
    PrometheusSharedInformerFactory() prominformers.SharedInformerFactory

    // Start shared informer factory one by one if they are not nil
    Start(stopCh <-chan struct{})
}

monitoring Interface接口

监控接口

type Interface interface {
    //获取Metric
    GetMetric(expr string, time time.Time) Metric
    //根据起始时间和结束时间获取Metric
    GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric
    //获取指定名称的Metric
    GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric
    //根据起始时间和结束时间获取指定名称的Metric
    GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric
    //获取Metric元数据
    GetMetadata(namespace string) []Metadata
    //根据Label正则匹配获取Metric
    GetMetricLabelSet(expr string, start, end time.Time) []map[string]string

    //根据meter获取Metric
    GetNamedMeters(meters []string, time time.Time, opts []QueryOption) []Metric
    //根据起始时间和结束时间,meter获取Metric
    GetNamedMetersOverTime(metrics []string, start, end time.Time, step time.Duration, opts []QueryOption) []Metric
}

logging Client接口

日志接口

type Client interface {
    //获取当前统计值
    GetCurrentStats(sf SearchFilter) (Statistics, error)
    //根据间隔统计日志
    CountLogsByInterval(sf SearchFilter, interval string) (Histogram, error)
    //查询日志
    SearchLogs(sf SearchFilter, from, size int64, order string) (Logs, error)
    //导出日志
    ExportLogs(sf SearchFilter, w io.Writer) error
}

s3 Interface接口

s3对象存储接口

type Interface interface {
    //根据key读取对象数据
    //read the content, caller should close the io.ReadCloser.
    Read(key string) ([]byte, error)
    //上传文件
    // Upload uploads a object to storage and returns object location if succeeded
    Upload(key, fileName string, body io.Reader, size int) error
    //获取下载链接
    GetDownloadURL(key string, fileName string) (string, error)
    //根据key删除对象
    // Delete deletes an object by its key
    Delete(key string) error
}

devops Interface接口

devops接口,嵌套接口

type Interface interface {
    //凭证相关操作
    CredentialOperator
    //获取Pipeline job,构建过程
    BuildGetter
    //Pipeline相关操作
    PipelineOperator
    //项目pipeline相关操作
    ProjectPipelineOperator
    //DevOps项目相关操作
    ProjectOperator
    //DevOps项目角色相关操作
    RoleOperator
}

CredentialOperator接口

type CredentialOperator interface {
    //创建凭证
    CreateCredentialInProject(projectId string, credential *v1.Secret) (string, error)
    //更新凭证
    UpdateCredentialInProject(projectId string, credential *v1.Secret) (string, error)
    //获取凭证
    GetCredentialInProject(projectId, id string) (*Credential, error)
    //删除凭证
    DeleteCredentialInProject(projectId, id string) (string, error)
}

BuildGetter接口

type BuildGetter interface {
    //根据projectId,pipelineId获取pipeline job, 构建过程
    // GetProjectPipelineBuildByType get the last build of the pipeline, status can specify the status of the last build.
    GetProjectPipelineBuildByType(projectId, pipelineId string, status string) (*Build, error)
    //根据projectId,pipelineId,branch获取pipeline job, 构建过程
    // GetMultiBranchPipelineBuildByType get the last build of the pipeline, status can specify the status of the last build.
    GetMultiBranchPipelineBuildByType(projectId, pipelineId, branch string, status string) (*Build, error)
}

PipelineOperator接口

type PipelineOperator interface {
    //获取Pipeline
    // Pipelinne operator interface
    GetPipeline(projectName, pipelineName string, httpParameters *HttpParameters) (*Pipeline, error)
    //获取Pipeline列表
    ListPipelines(httpParameters *HttpParameters) (*PipelineList, error)
    //获取正在运行的Pipeline
    GetPipelineRun(projectName, pipelineName, runId string, httpParameters *HttpParameters) (*PipelineRun, error)
    //获取正在运行的Pipeline列表
    ListPipelineRuns(projectName, pipelineName string, httpParameters *HttpParameters) (*PipelineRunList, error)
    //停止Pipeline
    StopPipeline(projectName, pipelineName, runId string, httpParameters *HttpParameters) (*StopPipeline, error)
    //重新运行Pipeline
    ReplayPipeline(projectName, pipelineName, runId string, httpParameters *HttpParameters) (*ReplayPipeline, error)
    //运行Pipeline
    RunPipeline(projectName, pipelineName string, httpParameters *HttpParameters) (*RunPipeline, error)
    //获取制品列表
    GetArtifacts(projectName, pipelineName, runId string, httpParameters *HttpParameters) ([]Artifacts, error)
    //获取正在运行的job日志
    GetRunLog(projectName, pipelineName, runId string, httpParameters *HttpParameters) ([]byte, error)
    //获取正在运行的job的步骤日志
    GetStepLog(projectName, pipelineName, runId, nodeId, stepId string, httpParameters *HttpParameters) ([]byte, http.Header, error)
    //获取节点步骤列表
    GetNodeSteps(projectName, pipelineName, runId, nodeId string, httpParameters *HttpParameters) ([]NodeSteps, error)
    //获取在运行pipeline的节点列表
    GetPipelineRunNodes(projectName, pipelineName, runId string, httpParameters *HttpParameters) ([]PipelineRunNodes, error)
    //添加pipelie步骤
    SubmitInputStep(projectName, pipelineName, runId, nodeId, stepId string, httpParameters *HttpParameters) ([]byte, error)

    //BranchPipelinne operator interface
    //获取某个分支的pipeline
    GetBranchPipeline(projectName, pipelineName, branchName string, httpParameters *HttpParameters) (*BranchPipeline, error)
    //获取正在运行的某个分支的pipeline
    GetBranchPipelineRun(projectName, pipelineName, branchName, runId string, httpParameters *HttpParameters) (*PipelineRun, error)
    //停止某个分支的pipeline
    StopBranchPipeline(projectName, pipelineName, branchName, runId string, httpParameters *HttpParameters) (*StopPipeline, error)
    //重运行某个分支的pipeline
    ReplayBranchPipeline(projectName, pipelineName, branchName, runId string, httpParameters *HttpParameters) (*ReplayPipeline, error)
    //运行某个分支的pipeline
    RunBranchPipeline(projectName, pipelineName, branchName string, httpParameters *HttpParameters) (*RunPipeline, error)
    //获取某个分支的制品列表
    GetBranchArtifacts(projectName, pipelineName, branchName, runId string, httpParameters *HttpParameters) ([]Artifacts, error)
    //获取正在运行的某个分支的pipeline日志
    GetBranchRunLog(projectName, pipelineName, branchName, runId string, httpParameters *HttpParameters) ([]byte, error)
    //获取正在运行的某个分支的pipeline步骤日志
    GetBranchStepLog(projectName, pipelineName, branchName, runId, nodeId, stepId string, httpParameters *HttpParameters) ([]byte, http.Header, error)
    //获取节点正在运行的某个分支的pipeline步骤列表
    GetBranchNodeSteps(projectName, pipelineName, branchName, runId, nodeId string, httpParameters *HttpParameters) ([]NodeSteps, error)
    //获取正在运行的某个分支的节点列表
    GetBranchPipelineRunNodes(projectName, pipelineName, branchName, runId string, httpParameters *HttpParameters) ([]BranchPipelineRunNodes, error)
    //添加分支步骤
    SubmitBranchInputStep(projectName, pipelineName, branchName, runId, nodeId, stepId string, httpParameters *HttpParameters) ([]byte, error)
    //获取Pipeline某个分支
    GetPipelineBranch(projectName, pipelineName string, httpParameters *HttpParameters) (*PipelineBranch, error)
    //扫描分支
    ScanBranch(projectName, pipelineName string, httpParameters *HttpParameters) ([]byte, error)

    // Common pipeline operator interface
    //获取控制台日志
    GetConsoleLog(projectName, pipelineName string, httpParameters *HttpParameters) ([]byte, error)
    //获取csrftoken
    GetCrumb(httpParameters *HttpParameters) (*Crumb, error)

    // SCM operator interface
    //获取版本控制系统server列表
    GetSCMServers(scmId string, httpParameters *HttpParameters) ([]SCMServer, error)
    //获取版本控制系统org列表
    GetSCMOrg(scmId string, httpParameters *HttpParameters) ([]SCMOrg, error)
    //获取版本控制系统org的Repositories
    GetOrgRepo(scmId, organizationId string, httpParameters *HttpParameters) (OrgRepo, error)
    //添加版本控制系统server
    CreateSCMServers(scmId string, httpParameters *HttpParameters) (*SCMServer, error)
    //校验版本控制系统server
    Validate(scmId string, httpParameters *HttpParameters) (*Validates, error)

    //Webhook operator interface
    //获取通知的commitId
    GetNotifyCommit(httpParameters *HttpParameters) ([]byte, error)
    //获取webhook地址
    GithubWebhook(httpParameters *HttpParameters) ([]byte, error)
    //检测job脚本编译情况
    CheckScriptCompile(projectName, pipelineName string, httpParameters *HttpParameters) (*CheckScript, error)
    //检测job定时触发器
    CheckCron(projectName string, httpParameters *HttpParameters) (*CheckCronRes, error)
    //获取Pipeline jenkinsfile
    ToJenkinsfile(httpParameters *HttpParameters) (*ResJenkinsfile, error)
    //获取Pipeline json
    ToJson(httpParameters *HttpParameters) (map[string]interface{}, error)
}

ProjectPipelineOperator接口

type ProjectPipelineOperator interface {
    //创建DevOps项目pipeline
    CreateProjectPipeline(projectId string, pipeline *v1alpha3.Pipeline) (string, error)
    //删除DevOps项目pipeline
    DeleteProjectPipeline(projectId string, pipelineId string) (string, error)
    //更新DevOps项目pipeline
    UpdateProjectPipeline(projectId string, pipeline *v1alpha3.Pipeline) (string, error)
    //获取DevOps项目pipeline配置
    GetProjectPipelineConfig(projectId, pipelineId string) (*v1alpha3.Pipeline, error)
}

ProjectOperator接口

type ProjectOperator interface {
    //创建DevOps项目
    CreateDevOpsProject(projectId string) (string, error)
    //删除DevOps项目
    DeleteDevOpsProject(projectId string) error
    //获取DevOps项目
    GetDevOpsProject(projectId string) (string, error)
}

RoleOperator接口

type RoleOperator interface {
    //创建全局角色
    AddGlobalRole(roleName string, ids GlobalPermissionIds, overwrite bool) error
    //获取全局角色
    GetGlobalRole(roleName string) (string, error)
    //创建DevOps项目角色
    AddProjectRole(roleName string, pattern string, ids ProjectPermissionIds, overwrite bool) error
    //删除DevOps项目角色
    DeleteProjectRoles(roleName ...string) error
    //为用户分配DevOps项目角色
    AssignProjectRole(roleName string, sid string) error
    //取消用户DevOps项目角色
    UnAssignProjectRole(roleName string, sid string) error
    //为用户分配全局角色
    AssignGlobalRole(roleName string, sid string) error
    //取消用户全局角色
    UnAssignGlobalRole(roleName string, sid string) error
    //删除DevOps项目的用户
    DeleteUserInProject(sid string) error
}

cache Interface接口

缓存接口

type Interface interface {
    // Keys retrieves all keys match the given pattern
    //获取key列表
    Keys(pattern string) ([]string, error)

    // Get retrieves the value of the given key, return error if key doesn't exist
    //根据key获取缓存
    Get(key string) (string, error)

    // Set sets the value and living duration of the given key, zero duration means never expire
    //设置缓存
    Set(key string, value string, duration time.Duration) error

    // Del deletes the given key, no error returned if the key doesn't exists
    //删除缓存
    Del(keys ...string) error

    // Exists checks the existence of a give key
    //判断缓存是否存在
    Exists(keys ...string) (bool, error)

    // Expires updates object's expiration time, return err if key doesn't exist
    //更新缓存过期时间
    Expire(key string, duration time.Duration) error
}

events Client接口

type Client interface {
    //查询事件
    SearchEvents(filter *Filter, from, size int64, sort string) (*Events, error)
    //按一段时间统计事件
    CountOverTime(filter *Filter, interval string) (*Histogram, error)
    //统计有多少种资源,多少个事件
    StatisticsOnResources(filter *Filter) (*Statistics, error)
}

auditing Client接口

type Client interface {
    //查询审计事件
    SearchAuditingEvent(filter *Filter, from, size int64, sort string) (*Events, error)
    //按一段时间统计事件
    CountOverTime(filter *Filter, interval string) (*Histogram, error)
    //统计有多少种资源,多少个事件
    StatisticsOnResources(filter *Filter) (*Statistics, error)
}

alerting RuleClient接口

type RuleClient interface {
    //获取Prometheus告警规则列表
    PrometheusRules(ctx context.Context) ([]*RuleGroup, error)
    //获取Thanos告警规则列表
    ThanosRules(ctx context.Context) ([]*RuleGroup, error)
}

token Issuer接口

// Issuer issues token to user, tokens are required to perform mutating requests to resources
type Issuer interface {
    // IssueTo issues a token a User, return error if issuing process failed
    //为某个用户发布token
    IssueTo(request *IssueRequest) (string, error)

    // Verify verifies a token, and return a user info if it's a valid token, otherwise return error
    //校验token
    Verify(string) (*VerifiedResponse, error)

    // Keys hold encryption and signing keys.
    //获取公钥私钥信息
    Keys() *Keys
}

IdentityManagementInterface接口

认证管理接口

type IdentityManagementInterface interface {
    //创建用户
    CreateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
    //获取用户列表
    ListUsers(query *query.Query) (*api.ListResult, error)
    //删除用户
    DeleteUser(username string) error
    //更新用户
    UpdateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
    //获取用户
    DescribeUser(username string) (*iamv1alpha2.User, error)
    //修改用户密码
    ModifyPassword(username string, password string) error
    //获取用户登录记录
    ListLoginRecords(username string, query *query.Query) (*api.ListResult, error)
    //校验用户密码
    PasswordVerify(username string, password string) error
}

AccessManagementInterface接口

权限管理接口

type AccessManagementInterface interface {
    //根据用户获取全局角色
    GetGlobalRoleOfUser(username string) (*iamv1alpha2.GlobalRole, error)
    //根据用户获取工作空间角色
    GetWorkspaceRoleOfUser(username string, groups []string, workspace string) ([]*iamv1alpha2.WorkspaceRole, error)
    //根据用户获取集群角色
    GetClusterRoleOfUser(username string) (*rbacv1.ClusterRole, error)
    //根据用户获取命名空间角色
    GetNamespaceRoleOfUser(username string, groups []string, namespace string) ([]*rbacv1.Role, error)
    //获取角色列表
    ListRoles(namespace string, query *query.Query) (*api.ListResult, error)
    //获取集群角色列表
    ListClusterRoles(query *query.Query) (*api.ListResult, error)
    //获取工作空间角色列表
    ListWorkspaceRoles(query *query.Query) (*api.ListResult, error)
    //获取全局角色列表
    ListGlobalRoles(query *query.Query) (*api.ListResult, error)
    //获取全局角色授权列表
    ListGlobalRoleBindings(username string) ([]*iamv1alpha2.GlobalRoleBinding, error)
    //获取全局集群角色授权列表
    ListClusterRoleBindings(username string) ([]*rbacv1.ClusterRoleBinding, error)
    //获取工作空间角色授权列表
    ListWorkspaceRoleBindings(username string, groups []string, workspace string) ([]*iamv1alpha2.WorkspaceRoleBinding, error)
    //获取角色授权列表
    ListRoleBindings(username string, groups []string, namespace string) ([]*rbacv1.RoleBinding, error)
    //获取角色策略列表
    GetRoleReferenceRules(roleRef rbacv1.RoleRef, namespace string) (string, []rbacv1.PolicyRule, error)
    //获取全局角色
    GetGlobalRole(globalRole string) (*iamv1alpha2.GlobalRole, error)
    //获取工作空间角色
    GetWorkspaceRole(workspace string, name string) (*iamv1alpha2.WorkspaceRole, error)
    //创建全局角色授权
    CreateGlobalRoleBinding(username string, globalRole string) error
    //创建更新工作空间角色
    CreateOrUpdateWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
    //Patch工作空间角色
    PatchWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
    //创建更新全局角色
    CreateOrUpdateGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
    //Patch全局角色
    PatchGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
    //删除工作空间角色
    DeleteWorkspaceRole(workspace string, name string) error
    //删除全局角色
    DeleteGlobalRole(name string) error
    //创建更新集群角色
    CreateOrUpdateClusterRole(clusterRole *rbacv1.ClusterRole) (*rbacv1.ClusterRole, error)
    //删除集群角色
    DeleteClusterRole(name string) error
    //获取集群角色
    GetClusterRole(name string) (*rbacv1.ClusterRole, error)
    //获取命名空间角色
    GetNamespaceRole(namespace string, name string) (*rbacv1.Role, error)
    //创建更新命名空间角色
    CreateOrUpdateNamespaceRole(namespace string, role *rbacv1.Role) (*rbacv1.Role, error)
    //删除命名空间角色
    DeleteNamespaceRole(namespace string, name string) error
    //创建工作空间用户授权
    CreateUserWorkspaceRoleBinding(username string, workspace string, role string) error
    //移除工作空间用户
    RemoveUserFromWorkspace(username string, workspace string) error
    //创建命名空间角色授权
    CreateNamespaceRoleBinding(username string, namespace string, role string) error
    //移除命名空间用户
    RemoveUserFromNamespace(username string, namespace string) error
    //创建集群角色授权
    CreateClusterRoleBinding(username string, role string) error
    //移除集群用户
    RemoveUserFromCluster(username string) error
    //获取DevOps项目关联的命名空间
    GetDevOpsRelatedNamespace(devops string) (string, error)
    //获取控制命名空间的工作空间
    GetNamespaceControlledWorkspace(namespace string) (string, error)
    //获取控制DevOps项目的工作空间
    GetDevOpsControlledWorkspace(devops string) (string, error)
    //Patch命名空间角色
    PatchNamespaceRole(namespace string, role *rbacv1.Role) (*rbacv1.Role, error)
    //Patch集群角色
    PatchClusterRole(clusterRole *rbacv1.ClusterRole) (*rbacv1.ClusterRole, error)
    //根据工作空间获取角色授权列表
    ListGroupRoleBindings(workspace string, query *query.Query) ([]*rbacv1.RoleBinding, error)
    //创建角色授权
    CreateRoleBinding(namespace string, roleBinding *rbacv1.RoleBinding) (*rbacv1.RoleBinding, error)
    //删除角色授权
    DeleteRoleBinding(namespace, name string) error
    //根据工作空间获取角色授权列表
    ListGroupWorkspaceRoleBindings(workspace string, query *query.Query) (*api.ListResult, error)
    //创建工作空间角色授权
    CreateWorkspaceRoleBinding(workspace string, roleBinding *iamv1alpha2.WorkspaceRoleBinding) (*iamv1alpha2.WorkspaceRoleBinding, error)
    //删除工作空间角色授权
    DeleteWorkspaceRoleBinding(workspaceName, name string) error
}

启动函数

//kubesphere.io/kubesphere/pkg/apiserver/apiserver.go

package main

import (
    "log"

    "kubesphere.io/kubesphere/cmd/ks-apiserver/app"
)

func main() {
    //基于cobra封装的命令行启动,初始化一个cobra.Command结构体指针
    cmd := app.NewAPIServerCommand()
    //加载命令行参数
    if err := cmd.Execute(); err != nil {
        log.Fatalln(err)
    }
}

NewAPIServerCommand函数

初始化cobra.Command结构体指针函数

//kubesphere.io/kubesphere/cmd/ks-apiserver/app/server.go

func NewAPIServerCommand() *cobra.Command {
    //初始化api-server启动运行参数,包含两类参数
    //1.genericoptions.ServerRunOptions结构体
    //2.Config结构体
    s := options.NewServerRunOptions()

    //支持从配置文件中加载运行参数
    conf, err := apiserverconfig.TryLoadFromDisk()
    if err == nil {
        //如果有效的化,重新赋值ServerRunOptions结构体指针
        s = &options.ServerRunOptions{
            GenericServerRunOptions: s.GenericServerRunOptions,
            Config:                  conf,
        }
    } else {
        //日志打印异常
        klog.Fatal("Failed to load configuration from disk", err)
    }

    //初始化Command结构体指针
    cmd := &cobra.Command{
        Use: "ks-apiserver",
        Long: `The KubeSphere API server validates and configures data for the API objects.
The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
        RunE: func(cmd *cobra.Command, args []string) error {
            //校验各个组件适配器模块的配置参数
            if errs := s.Validate(); len(errs) != 0 {
                //error聚合,返回一个实现Aggregate接口的实例
                return utilerrors.NewAggregate(errs)
            }
            //主处理逻辑,信号处理函数返回一个ctx
            return Run(s, signals.SetupSignalHandler())
        },
        SilenceUsage: true,
    }

    //初始化一个flag.FlagSet结构体指针
    fs := cmd.Flags()
    //不同组件适配器的参数归属于各自的FlagSet
    namedFlagSets := s.Flags()
    for _, f := range namedFlagSets.FlagSets {
        fs.AddFlagSet(f)
    }

    //设置命令行帮助函数
    usageFmt := "Usage:\n  %s\n"
    cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
    cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
        fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
        cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    })

    //初始化version子命令
    versionCmd := &cobra.Command{
        Use:   "version",
        Short: "Print the version of KubeSphere ks-apiserver",
        Run: func(cmd *cobra.Command, args []string) {
            cmd.Println(version.Get())
        },
    }
    //添加version子命令
    cmd.AddCommand(versionCmd)

    return cmd
}

Run函数

主处理逻辑Run函数

func Run(s *options.ServerRunOptions, ctx context.Context) error {
    //初始化apiserver结构体指针
    apiserver, err := s.NewAPIServer(ctx.Done())
    if err != nil {
        return err
    }
    //apiserver准备运行阶段
    err = apiserver.PrepareRun(ctx.Done())
    if err != nil {
        return nil
    }

    return apiserver.Run(ctx)
}

NewAPIServer函数

初始化apiserver结构体指针

// NewAPIServer creates an APIServer instance using given options
func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
    //Config结构体赋值
    apiServer := &apiserver.APIServer{
        Config: s.Config,
    }
    //初始化k8s clisentSet
    kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
    if err != nil {
        return nil, err
    }
    apiServer.KubernetesClient = kubernetesClient
    //实现了InformerFactory接口,初始化一个聚合型的共享informer,包含多个不同组件的informer
    informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),
        kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
    apiServer.InformerFactory = informerFactory

    //判断是否定义监控参数
    if s.MonitoringOptions == nil || len(s.MonitoringOptions.Endpoint) == 0 {
        return nil, fmt.Errorf("moinitoring service address in configuration MUST not be empty, please check configmap/kubesphere-config in kubesphere-system namespace")
    } else {
        //初始化prometheus client,实现了monitoring.Interface接口
        monitoringClient, err := prometheus.NewPrometheus(s.MonitoringOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to connect to prometheus, please check prometheus status, error: %v", err)
        }
        apiServer.MonitoringClient = monitoringClient
    }
    //初始化MetricsClient
    apiServer.MetricsClient = metricsserver.NewMetricsClient(kubernetesClient.Kubernetes(), s.KubernetesOptions)
    //判断是否定义logging参数
    if s.LoggingOptions.Host != "" {
        //初始化loggingClient,实现了logging.Client接口
        loggingClient, err := esclient.NewClient(s.LoggingOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to connect to elasticsearch, please check elasticsearch status, error: %v", err)
        }
        apiServer.LoggingClient = loggingClient
    }
    //判断是否定义S3参数
    if s.S3Options.Endpoint != "" {
        //如果启用Debug模式,进入fake赋值
        if s.S3Options.Endpoint == fakeInterface && s.DebugMode {
            apiServer.S3Client = fakes3.NewFakeS3()
        } else {
            //初始化s3Client,实现了s3 Interface接口
            s3Client, err := s3.NewS3Client(s.S3Options)
            if err != nil {
                return nil, fmt.Errorf("failed to connect to s3, please check s3 service status, error: %v", err)
            }
            apiServer.S3Client = s3Client
        }
    }
    //判断是否定义Devops参数
    if s.DevopsOptions.Host != "" {
        //初始化devopsClient,实现了devops.Interface接口
        devopsClient, err := jenkins.NewDevopsClient(s.DevopsOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to connect to jenkins, please check jenkins status, error: %v", err)
        }
        apiServer.DevopsClient = devopsClient
    }
    //判断是否定义SonarQube参数
    if s.SonarQubeOptions.Host != "" {
        //初始化sonarClient
        sonarClient, err := sonarqube.NewSonarQubeClient(s.SonarQubeOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to connecto to sonarqube, please check sonarqube status, error: %v", err)
        }
        apiServer.SonarClient = sonarqube.NewSonar(sonarClient.SonarQube())
    }

    var cacheClient cache.Interface
    //判断是否定义Redis参数
    if s.RedisOptions != nil && len(s.RedisOptions.Host) != 0 {
        //如果启用Debug模式,进入fake赋值
        if s.RedisOptions.Host == fakeInterface && s.DebugMode {
            apiServer.CacheClient = cache.NewSimpleCache()
        } else {
            //初始化cacheClient,实现了cache Interface接口
            cacheClient, err = cache.NewRedisClient(s.RedisOptions, stopCh)
            if err != nil {
                return nil, fmt.Errorf("failed to connect to redis service, please check redis status, error: %v", err)
            }
            apiServer.CacheClient = cacheClient
        }
    } else {
        klog.Warning("ks-apiserver starts without redis provided, it will use in memory cache. " +
            "This may cause inconsistencies when running ks-apiserver with multiple replicas.")
        apiServer.CacheClient = cache.NewSimpleCache()
    }
    //判断是否定义Events参数
    if s.EventsOptions.Host != "" {
        //初始化eventsClient,实现了events.Client接口
        eventsClient, err := eventsclient.NewClient(s.EventsOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to connect to elasticsearch, please check elasticsearch status, error: %v", err)
        }
        apiServer.EventsClient = eventsClient
    }
    //判断是否定义Auditing参数
    if s.AuditingOptions.Host != "" {
        //初始化auditingClient,实现了auditing.Client接口
        auditingClient, err := auditingclient.NewClient(s.AuditingOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to connect to elasticsearch, please check elasticsearch status, error: %v", err)
        }
        apiServer.AuditingClient = auditingClient
    }
    //判断是否定义Alerting参数
    if s.AlertingOptions != nil && (s.AlertingOptions.PrometheusEndpoint != "" || s.AlertingOptions.ThanosRulerEndpoint != "") {
        //初始化alertingClient,实现了alerting RuleClient接口
        alertingClient, err := alerting.NewRuleClient(s.AlertingOptions)
        if err != nil {
            return nil, fmt.Errorf("failed to init alerting client: %v", err)
        }
        apiServer.AlertingClient = alertingClient
    }
    //初始化一个http.Server结构体指针
    server := &http.Server{
        Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
    }
    //判断是否定义SecurePort参数
    if s.GenericServerRunOptions.SecurePort != 0 {
        //加载tls配置
        certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)
        if err != nil {
            return nil, err
        }

        server.TLSConfig = &tls.Config{
            Certificates: []tls.Certificate{certificate},
        }
        server.Addr = fmt.Sprintf(":%d", s.GenericServerRunOptions.SecurePort)
    }

    sch := scheme.Scheme
    if err := apis.AddToScheme(sch); err != nil {
        klog.Fatalf("unable add APIs to scheme: %v", err)
    }
    //初始化controller-runtime cache
    apiServer.RuntimeCache, err = runtimecache.New(apiServer.KubernetesClient.Config(), runtimecache.Options{Scheme: sch})
    if err != nil {
        klog.Fatalf("unable to create controller runtime cache: %v", err)
    }
    //初始化controller-runtime client
    apiServer.RuntimeClient, err = runtimeclient.New(apiServer.KubernetesClient.Config(), runtimeclient.Options{Scheme: sch})
    if err != nil {
        klog.Fatalf("unable to create controller runtime client: %v", err)
    }
    //初始化Issuer实例, 实现了token Issuer接口
    apiServer.Issuer, err = token.NewIssuer(s.AuthenticationOptions)
    if err != nil {
        klog.Fatalf("unable to create issuer: %v", err)
    }
    //赋值給apiServer
    apiServer.Server = server
    //返回apiServer结构体指针
    return apiServer, nil
}

PrepareRun函数

apiserver准备运行阶段

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
    //初始化container结构体指针
    s.container = restful.NewContainer()
    //设置container过滤器,这些过滤器先被调用到,在http请求到达后端web路由服务之前
    s.container.Filter(logRequestAndResponse)
    //设置container默认路由
    s.container.Router(restful.CurlyRouter{})
    //设置container Recover处理函数
    s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
        logStackOnRecover(panicReason, httpWriter)
    })
    //设置container kubersphere web路由服务
    s.installKubeSphereAPIs()
    //
    s.installMetricsAPI()
    //
    s.container.Filter(monitorRequest)

    for _, ws := range s.container.RegisteredWebServices() {
        klog.V(2).Infof("%s", ws.RootPath())
    }

    s.Server.Handler = s.container

    s.buildHandlerChain(stopCh)

    return nil
}

设置container KubeSphere api web路由服务,初始化所有的kubesphere api路由是发生在informers启动之前,所以 通过lister获取到的对象将会是空的

func (s *APIServer) installKubeSphereAPIs() {
    //实现了IdentityManagementInterface接口
    imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
        user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
            s.InformerFactory.KubernetesSharedInformerFactory()),
        loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
        s.Config.AuthenticationOptions)
    //实现了AccessManagementInterface接口
    amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
        s.KubernetesClient.Kubernetes(),
        s.InformerFactory,
        s.DevopsClient)
    //初始化一个RBACAuthorizer结构体指针
    rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
    //添加oauth,configz,gpu资源对象API路由
    urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
    //添加cluster、component、namespace v1alpha3资源对象API路由
    urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
    //添加metric资源对象API路由
    urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
    //添加meter资源对象API路由
    urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache, s.Config.MeteringOptions, nil))
    //添加openpitrix v1资源对象API路由
    urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
    //添加openpitrix v2alpha1资源对象API路由
    urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
    //添加openpitrix v1alpha2资源对象API路由
    urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
    //添加cluster、component、namespace v1alpha2资源对象API路由
    urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
        s.KubernetesClient.Master()))
    //添加tenant v1alpha2资源对象API路由
    urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
        s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
    //添加terminal v1alpha2资源对象API路由
    urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config()))
    //添加clusterkapis v1alpha1资源对象API路由
    urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
        s.InformerFactory.KubernetesSharedInformerFactory(),
        s.InformerFactory.KubeSphereSharedInformerFactory(),
        s.Config.MultiClusterOptions.ProxyPublishService,
        s.Config.MultiClusterOptions.ProxyPublishAddress,
        s.Config.MultiClusterOptions.AgentImage))
    //添加iamapi资源对象API路由
    urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
        group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
        rbacAuthorizer))

    userLister := s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister()
    //添加oauth资源对象API路由
    urlruntime.Must(oauth.AddToContainer(s.container, imOperator,
        auth.NewTokenOperator(s.CacheClient, s.Issuer, s.Config.AuthenticationOptions),
        auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(), userLister, s.Config.AuthenticationOptions),
        auth.NewOAuthAuthenticator(s.KubernetesClient.KubeSphere(), userLister, s.Config.AuthenticationOptions),
        auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(), userLister),
        s.Config.AuthenticationOptions))
    //添加servicemesh v1alpha2资源对象API路由
    urlruntime.Must(servicemeshv1alpha2.AddToContainer(s.Config.ServiceMeshOptions, s.container, s.KubernetesClient.Kubernetes(), s.CacheClient))
    //添加network v1alpha2资源对象API路由
    urlruntime.Must(networkv1alpha2.AddToContainer(s.container, s.Config.NetworkOptions.WeaveScopeHost))
    //添加devops v1alpha3资源对象API路由
    urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
    //添加devops v1alpha3资源对象API路由
    urlruntime.Must(devopsv1alpha3.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
    //添加notification v1资源对象API路由
    urlruntime.Must(notificationv1.AddToContainer(s.container, s.Config.NotificationOptions.Endpoint))
    //添加alerting v1资源对象API路由
    urlruntime.Must(alertingv1.AddToContainer(s.container, s.Config.AlertingOptions.Endpoint))
    //添加alerting v2alpha1资源对象API路由
    urlruntime.Must(alertingv2alpha1.AddToContainer(s.container, s.InformerFactory,
        s.KubernetesClient.Prometheus(), s.AlertingClient, s.Config.AlertingOptions))
    //添加version资源对象API路由
    urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Discovery()))
    //添加kubeedge v1alpha1资源对象API路由
    urlruntime.Must(kubeedgev1alpha1.AddToContainer(s.container, s.Config.KubeEdgeOptions.Endpoint))
    //添加notificationkapis v2beta1资源对象API路由
    urlruntime.Must(notificationkapisv2beta1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
        s.KubernetesClient.KubeSphere()))
    //添加notificationkapis v2beta2资源对象API路由
    urlruntime.Must(notificationkapisv2beta2.AddToContainer(s.container, s.Config.NotificationOptions))
    //添加gateway v1alpha1资源对象API路由
    urlruntime.Must(gatewayv1alpha1.AddToContainer(s.container, s.Config.GatewayOptions, s.RuntimeCache, s.RuntimeClient, s.InformerFactory, s.KubernetesClient.Kubernetes(), s.LoggingClient))
}

ks-devops-apiserver

ks-devops-controller

jenkins需要安装pipeline,BlueOcean插件

Blue Ocean简介

Blue Ocean是jenkins上的一款插件,它能帮助你使用可视化的方式创建流水线,根据需要添加同时执行的阶段和并行任务,并且会将创建好的JenkinsFile自动上传到代码仓库。 美观简洁的界面和功能丰富的导航栏及各类按钮能帮助你高效获取项目运行信息,同时Blue Ocean也支持用户使用传统方式编辑项目,方便用户灵活选择。

BlueOcean rest api接口: https://github.com/jenkinsci/blueocean-plugin/blob/master/blueocean-rest/README.md

实现原理

DevOps项目、流水线、流水线运行分别对应DevOpsProject、Pipeline、PipelineRun crd

DevOps项目:

apiVersion: devops.kubesphere.io/v1alpha3
kind: DevOpsProject
metadata:
  annotations:
    devopsproject.devops.kubesphere.io/syncstatus: successful
    devopsproject.devops.kubesphere.io/synctime: 2022-07-15 05:33:51.436609491 +0000
      UTC m=+5319.051527551
    kubesphere.io/creator: admin
  creationTimestamp: "2022-07-15T05:33:51Z"
  finalizers:
  - devopsproject.finalizers.kubesphere.io
  generateName: test
  generation: 3
  labels:
    kubesphere.io/workspace: test2
  name: test59rn2
  resourceVersion: "62670680"
  uid: b483f632-1105-426c-8a14-ba7428d24d7b
spec:
  argo:
    clusterResourceWhitelist:
    - group: '*'
      kind: '*'
    destinations:
    - namespace: '*'
      server: '*'
    sourceRepos:
    - '*'
status:
  adminNamespace: test59rn2

流水线:

apiVersion: devops.kubesphere.io/v1alpha3
kind: Pipeline
metadata:
  annotations:
    kubesphere.io/creator: admin
    pipeline.devops.kubesphere.io/jenkins-metadata: '{"weatherScore":100,"estimatedDurationInMillis":-1,"name":"icetest"}'
  finalizers:
  - pipeline.finalizers.kubesphere.io
  name: icetest
  namespace: test2rglgf 
spec:
  pipeline:
    discarder:
      days_to_keep: "7"
      num_to_keep: "10"
    jenkinsfile: |2-
            pipeline {
              agent any

              parameters {
                string defaultValue: 'rick', description: 'just for testing', name: 'name', trim: true
                booleanParam defaultValue: false, description: 'You can use this flag to debug your Pipeline', name: 'debug'
                choice choices: ['v1.18.8', 'v1.18.9'], description: 'Please choose the target Kubernetes version', name: 'kubernetesVersion'
              }

              stages{
                stage('simple'){
                  steps{
                    echo "My name is ${params.name}."
                    echo "Target Kubernetes version is " + params.kubernetesVersion

                    script {
                      if ("${params.debug}" == "true") {
                        echo "You can put some debug info at here."
                        echo "Another way to use param: " + params.name
                      }
                    }
                  }
                }

                stage('debug stage') {
                  when {
                    equals expected: true, actual: params.debug
                  }
                  steps {
                    echo "It's joke, there're debug info here."
                  }
                }
              }
            }
    name: icetest
  type: pipeline
status: {}

流水线运行:

apiVersion: devops.kubesphere.io/v1alpha3
kind: PipelineRun
metadata:
  annotations:
    devops.kubesphere.io/creator: admin
  finalizers:
  - pipelinerun.finalizers.kubesphere.io
  generateName: icetest-
  labels:
    devops.kubesphere.io/pipeline: icetest
  name: icetest-fxrl7
  namespace: test2rglgf 
  ownerReferences:
  - apiVersion: devops.kubesphere.io/v1alpha3
    blockOwnerDeletion: true
    controller: true
    kind: Pipeline
    name: icetest
    uid: ad52a1ce-db8b-472d-b1a8-2c01fbfafb43
spec:
  pipelineRef:
    kind: Pipeline
    name: icetest
    namespace: test2rglgf 
  pipelineSpec:
    pipeline:
      discarder:
        days_to_keep: "7"
        num_to_keep: "10"
      jenkinsfile: |2-
              pipeline {
                agent any

                parameters {
                  string defaultValue: 'rick', description: 'just for testing', name: 'name', trim: true
                  booleanParam defaultValue: false, description: 'You can use this flag to debug your Pipeline', name: 'debug'
                  choice choices: ['v1.18.8', 'v1.18.9'], description: 'Please choose the target Kubernetes version', name: 'kubernetesVersion'
                }

                stages{
                  stage('simple'){
                    steps{
                      echo "My name is ${params.name}."
                      echo "Target Kubernetes version is " + params.kubernetesVersion

                      script {
                        if ("${params.debug}" == "true") {
                          echo "You can put some debug info at here."
                          echo "Another way to use param: " + params.name
                        }
                      }
                    }
                  }

                  stage('debug stage') {
                    when {
                      equals expected: true, actual: params.debug
                    }
                    steps {
                      echo "It's joke, there're debug info here."
                    }
                  }
                }
              }
      name: icetest
    type: pipeline
status: {}

5. 调试ks-devops

5.1 部署ks-devops

  • 安装好kubesphere
  • 启用devops插件,对应的部署控制器会自动安装devops相关组件
# kubectl patch -nkubesphere-system cc ks-installer --type=json -p='[{"op": "replace", "path": "/spec/devops/enabled", "value": true}]'
[root@devops ~]# kubectl -n kubesphere-devops-system get deployment
NAME                READY   UP-TO-DATE   AVAILABLE   AGE
devops-apiserver    1/1     1            1           3d
devops-controller   1/1     1            1           3d
devops-jenkins      1/1     1            1           3d

在kubesphere-devops-system命名空间下会多出devops-apiserver、devops-controller、devops-jenkins组件, 作用分别是:

  • devops-apiserver:作为devops的rest api server,基于go-restful框架实现,使用框架的filter机制 跟认证鉴权集成;对前跟UI交互,对后跟crd交互
  • devops-controller: devops crd的controller实现,负责devops流水线的全生命周期管理,有7个crd定义
[root@devops ~]# kubectl get crd |grep devops
devopsprojects.devops.kubesphere.io                   2022-06-10T03:38:22Z
pipelineruns.devops.kubesphere.io                     2022-06-10T03:38:22Z
pipelines.devops.kubesphere.io                        2022-06-10T03:38:22Z
s2ibinaries.devops.kubesphere.io                      2022-06-10T03:38:22Z
s2ibuilders.devops.kubesphere.io                      2022-06-10T03:38:22Z
s2ibuildertemplates.devops.kubesphere.io              2022-06-10T03:38:22Z
s2iruns.devops.kubesphere.io                          2022-06-10T03:38:22Z
  • devops-jenkins:jenkins容器化部署,可定制jenkins镜像

5.2 goland代码调试

克隆ks-devops代码到本地,以Goland IDE为例,调试本地代码

# git clone https://github.com/kubesphere/ks-devops

5.2.1 devops-apiserver本地调试

需要执行的步骤有:

  1. 从远程服务器上devops-config configmap中拷贝一份kubesphere.yaml配置文件,并保存至/etc/kubesphere/目录下
  2. 修改其中引用到的服务暴露方式为nodePort,需要修改的service有:kubesphere-devops-system/devops-jenkins,kubesphere-system/minio
  3. 从远程服务器上拷贝/root/.kube/config文件并保存至/etc/kubesphere/目录下,修改配置文件中的server地址为服务器IP
[root@devops ~]# kubectl -n kubesphere-devops-system get cm devops-config
XIABINGYAO-MB2:ks-devops iceyao$ cat /etc/kubesphere/kubesphere.yaml
kubernetes:
  kubeconfig: "/etc/kubesphere/config"
authentication:
  authenticateRateLimiterDuration: 10m0s
  authenticateRateLimiterMaxTries: "10"
  jwtSecret: FtoWMbXiivdjsCwmD89AbtJYBnOpE7qb
  loginHistoryRetentionPeriod: 168h
  maximumClockSkew: 10s
devops:
  host: http://<远程服务器IP>:30180
  maxConnections: "100"
  namespace: kubesphere-devops-system
  password: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwidG9rZW5fdHlwZSI6ImFjY2Vzc190b2tlbiIsImlhdCI6MTY1NDgzMjQ5OSwiaXNzIjoia3ViZXNwaGVyZSIsIm5iZiI6MTY1NDgzMjQ5OX0.avybWXQGpaSUJ69u5ucgLNzPAWLgYeDqCs90Yz0oxyc
  username: admin
  workerNamespace: kubesphere-devops-worker
ldap:
  groupSearchBase: ou=Groups,dc=kubesphere,dc=io
  host: openldap.kubesphere-syst:389
  managerDN: cn=admin,dc=kubesphere,dc=io
  managerPassword: admin
  userSearchBase: ou=Users,dc=kubesphere,dc=io
s3:
  accessKeyID: openpitrixminioaccesskey
  bucket: s2i-binaries
  disableSSL: "true"
  endpoint: http://minio.kubesphere-system.svc:9000
  forcePathStyle: "true"
  region: us-east-1
  secretAccessKey: openpitrixminiosecretkey

4.修改/etc/kubesphere/kubesphere.yaml完毕后,就可以启动本地的devops-apiserver服务,Goland debug方式启动

# 启动之前,先生成swagger-ui
XIABINGYAO-MB2:ks-devops iceyao$ make swagger-ui 
git clone https://github.com/swagger-api/swagger-ui -b v2.2.10 --depth 1 bin/swagger-ui
Cloning into 'bin/swagger-ui'...
remote: Enumerating objects: 190, done.
remote: Counting objects: 100% (190/190), done.
remote: Compressing objects: 100% (172/172), done.
remote: Total 190 (delta 11), reused 150 (delta 6), pack-reused 0
Receiving objects: 100% (190/190), 1.26 MiB | 4.33 MiB/s, done.
Resolving deltas: 100% (11/11), done.
Note: switching to '64dc3060b3700b12e466f8d67b7d7ec3574b015f'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false
# 浏览器访问swagger-ui url
GOROOT=/usr/local/go #gosetup
GOPATH=/Users/iceyao/Documents #gosetup
/usr/local/go/bin/go build -o /private/var/folders/vm/ry3ff3355gnfvmgt8w17872c0000gn/T/GoLand/___1devops_apiserver -gcflags all=-N -l /Users/iceyao/Documents/src/kubesphere.io/ks-devops/cmd/apiserver/apiserver.go #gosetup
/Applications/GoLand.app/Contents/plugins/go/lib/dlv/mac/dlv --listen=127.0.0.1:61009 --headless=true --api-version=2 --check-go-version=false --only-same-user=false exec /private/var/folders/vm/ry3ff3355gnfvmgt8w17872c0000gn/T/GoLand/___1devops_apiserver --
API server listening at: 127.0.0.1:61009
debugserver-@(#)PROGRAM:LLDB  PROJECT:lldb-1300.0.42.3
 for x86_64.
Got a connection, launched process /private/var/folders/vm/ry3ff3355gnfvmgt8w17872c0000gn/T/GoLand/___1devops_apiserver (pid = 84550).
W0614 14:10:45.302978   84550 options.go:142] ks-apiserver starts without redis provided, it will use in memory cache. This may cause inconsistencies when running ks-apiserver with multiple replicas.
I0614 14:10:45.351856   84550 sonarqube.go:55] Sonarqube integration is disabled
I0614 14:10:45.356766   84550 sonarqube.go:55] Sonarqube integration is disabled
[restful] 2022/06/14 14:10:45 log.go:33: [restful/swagger] listing is available at /apidocs.json
[restful] 2022/06/14 14:10:45 log.go:33: [restful/swagger] /apidocs/ is mapped to folder bin/swagger-ui/dist
I0614 14:10:45.393248   84550 apiserver.go:232] Start cache objects
I0614 14:10:45.617033   84550 apiserver.go:287] Finished caching objects
I0614 14:10:45.617082   84550 apiserver.go:195] Start listening on :9090
I0614 14:10:45.617101   84550 apiserver.go:196] Open the swagger-ui from http://localhost:9090/apidocs/?url=http://localhost:9090/apidocs.json

5.让远程服务器上的UI访问到本地的devops-apiserver服务,停掉远程服务器上kubesphere环境的devops-apiserver服务

[root@devops ~]# kubectl -n kubesphere-devops-system scale deployment devops-apiserver --replicas=0
deployment.apps/devops-apiserver scaled

6.设置ssh反向代理,把本地devops-apiserver映射到远程服务器的9090端口

# 编辑远端服务器的ssh配置文件
[root@devops ~]# vim /etc/ssh/sshd_config
AllowTcpForwarding yes
GatewayPorts yes

[root@devops ~]# service sshd restart
Redirecting to /bin/systemctl restart sshd.service
# 本地执行ssh反向代理,把本地devops-apisever的9090映射到远端服务器上的9090(端口根据实际情况映射)
XIABINGYAO-MB2:~ iceyao$ ssh -R 9090:localhost:9090 root@《远程服务器IP》

7.备份并修改kubesphere-config configmap, 让前端UI服务能够访问到本地的devops-apiserver服务

# 备份原来的configmap
[root@devops ~]# kubectl -n kubesphere-system get cm kubesphere-config -o yaml > kubesphere-system-kubesphere-config-cm.yaml
[root@devops ~]# kubectl -n kubesphere-system edit cm kubesphere-config
    network:
      ippoolType: none
    devops:
      host: http://devops-jenkins.kubesphere-devops-system.svc/
      username: admin
      password: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImFkbWluQGt1YmVzcGhlcmUuaW8iLCJ1c2VybmFtZSI6ImFkbWluIiwidG9rZW5fdHlwZSI6InN0YXRpY19
0b2tlbiJ9.qBb3GqTVotNRphWTkHOtpMUdgmhk_hOh_rAusEe4EaE
      maxConnections: 100
      endpoint: http://<远程服务器IP>:9090
# 重启ks-apiserver生效
[root@devops ~]# kubectl -n kubesphere-system delete pod ks-apiserver-5775575684-26tpb

8.Goland设置devops-apiserver断点,以http://xxx:30880/kapis/devops.kubesphere.io/v1alpha3/namespaces/testxx86v/pipelines/test/pipelineruns为例

kubesphere.io/ks-devops/pkg/kapis/devops/v1alpha3/pipelinerun/handler.go的listPipelineRuns函数设置断点, 浏览器查看正在运行的流水线就会触发断点捕获(调用api的方式当然也可以触发断点捕获);对apiserver代码得有个大概了解,这样设置的断点才能被捕获到

5.2.2 devops-controller本地调试

调试方式跟devops-apiserver类似

原理分析

动态构建 jenkins slave(Jenkins Kubernetes插件)

![](/img/posts/2021-09-28/动态构建jenkins slave.png)

动态构建 jenkins slave,根据需要自动创建 Pod(解决并发构建问题);KubeSphere的CI/CD是基于底层Kubernetes的动态Jenkins Slave, 也就是说 Jenkins Slave 具有动态伸缩的能力,能够根据任务的执行状态进行动态创建或自动注销释放资源。实际上, Jenkins Master和Jenkins Slave以Pod形式运行在KubeSphere集群的Node上,Master运行在其中一个节点, 并且将其配置数据存储到一个Volume中,Slave运行在各个节点上,并且它不是一直处于运行状态,它会按照需求动态的创建并自动删除。Jenkins Kubernetes插件 可以实现以上这些功能。

KubeSphere DevOps每次活动运行对应一个特定jenkins label的pod

[root@k8s-1 ~]# kubectl -n kubesphere-devops-system get pod maven-mzlkd
NAME          READY   STATUS    RESTARTS   AGE
maven-mzlkd   2/2     Running   0          15s

Jenkins内置podTemplate位置:系统管理 -> 系统配置 -> Cloud -> Pod Templates

Jenkins发布审批与外部流程中心打通

基于Jenkinsfile pipeline语法input语句设置卡点,形如input(id: 'deploy-to-prod', message: 'deploy to prod'),yes就继续下一个step,no就终止; 待审批流程全部通过后,自动回调接口填充yes参数继续下一个step

Jenkins流水线集成SonarQube

客户端可以通过IDE插件、Sonar-Scanner插件、Ant插件和Maven插件方式进行扫描分析

Issue

git push账户名密码方式已被废弃,配置流水线的时候使用personal access token,personal access token使用方式如下:

git push http://<username>:<personal access token>@github.com/yaoice/devops-java-sample.git

参考链接

「真诚赞赏,手留余香」

爱折腾的工程师

真诚赞赏,手留余香

使用微信扫描二维码完成支付