pcm-kubernetes/app/namespace.go

630 lines
20 KiB
Go

package app
import (
"context"
"encoding/json"
"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
"github.com/karmada-io/karmada/pkg/util"
"io"
v1 "k8s.io/api/batch/v1"
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"sigs.k8s.io/yaml"
"sort"
"strconv"
"strings"
"sync"
"time"
)
const (
namespace_memory_usage_wo_cache = "namespace:container_memory_usage_bytes_wo_cache:sum{cluster_name='clusterName',namespace='nameSpace', prometheus_replica='prometheus-k8s-0'}"
namespace_cpu_usage = "sum by (namespace) (namespace:workload_cpu_usage:sum{cluster_name='clusterName',namespace='nameSpace', prometheus_replica='prometheus-k8s-0'})"
namespace_pod_count = "count(sum by (pod) (kube_pod_container_info{cluster_name='clusterName', namespace='nameSpace', prometheus_replica='prometheus-k8s-0'}))"
metric_range_12h = "12h"
steps_namespace = 1080
)
type Namespace struct {
NsName string `json:"ns_name"`
State string `json:"state"`
Age string `json:"age"`
CrossDomain bool `json:"cross_domain"`
Domains []Domain `json:"domains"`
Clusters []string `json:"clusters"`
Deployments []string `json:"deployments"`
RequirePodNum int32 `json:"require_pod_num"`
AvailablePodNum int32 `json:"available_pod_num"`
Alias string `json:"alias"`
Describe string `json:"describe"`
TemplateId string `json:"templateId"`
}
type NamespaceParam struct {
TemplateId string `json:"templateId"`
Namespace coreV1.Namespace `json:"namespace"`
ClusterName []string `json:"clusterName"`
}
type DomainResult struct {
DomainName string `json:"domain_name"`
Location [2]float64 `json:"location"`
}
type NamespaceObject struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Metadata string `json:"metadata"`
ResourceVersion string `json:"resourceVersion"`
Items []coreV1.Namespace `json:"items"`
}
type JobObject struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Metadata string `json:"metadata"`
ResourceVersion string `json:"resourceVersion"`
Items []v1.Job `json:"items"`
}
type NamespacePodCount struct {
Namespace string `json:"namespace"`
Count int32 `json:"count"`
DomainType bool `json:"domainType"`
}
type NsResources struct {
PodCount int `json:"podCount"`
DeploymentCount int `json:"deploymentCount"`
StatefulSetCount int `json:"statefulSetCount"`
DaemonSetCount int `json:"daemonSetCount"`
JobCount int `json:"jobCount"`
PvcCount int `json:"pvcCount"`
ServiceCount int `json:"serviceCount"`
}
func GetNamespaceResources(c *gin.Context) {
clusterName := c.Query("clusterName")
namespace := c.Query("namespace")
// 查询pod
podResult := SearchObject(podListConst, clusterName, namespace, "")
var podObject PodObject
podRaw, _ := podResult.Raw()
json.Unmarshal(podRaw, &podObject)
// 查询deployment
deploymentResult := SearchObject(deploymentListConst, clusterName, namespace, "")
var deploymentObject DeploymentObject
deploymentRaw, _ := deploymentResult.Raw()
json.Unmarshal(deploymentRaw, &deploymentObject)
// 查询statefulSet
statefulSetResult := SearchObject(statefulSetListConst, clusterName, namespace, "")
var statefulSetObject StatefulSetObject
statefulSetRaw, _ := statefulSetResult.Raw()
json.Unmarshal(statefulSetRaw, &statefulSetObject)
// 查询daemonSet
daemonSetResult := SearchObject(daemonSetListConst, clusterName, namespace, "")
var daemonSetObject DaemonSetObject
daemonSetRaw, _ := daemonSetResult.Raw()
json.Unmarshal(daemonSetRaw, &daemonSetObject)
// 查询job
jobResult := SearchObject(jobListConst, clusterName, namespace, "")
var jobObject JobObject
jobRaw, _ := jobResult.Raw()
json.Unmarshal(jobRaw, &jobObject)
// 查询pvc
pvcResult := SearchObject(pvcListConst, clusterName, namespace, "")
var pvcObject PVCObject
pvcRaw, _ := pvcResult.Raw()
json.Unmarshal(pvcRaw, &pvcObject)
// 查询service
serviceResult := SearchObject(serviceConst, clusterName, namespace, "")
var serviceObject ServiceObject
serviceRaw, _ := serviceResult.Raw()
json.Unmarshal(serviceRaw, &serviceObject)
Response(c, http.StatusOK, "success", NsResources{
PodCount: len(podObject.Items),
DeploymentCount: len(deploymentObject.Items),
StatefulSetCount: len(statefulSetObject.Items),
DaemonSetCount: len(daemonSetObject.Items),
JobCount: len(jobObject.Items),
PvcCount: len(pvcObject.Items),
ServiceCount: len(serviceObject.Items),
})
}
// CreateNamespace 创建命名空间(项目)
// @Summary 创建命名空间(项目)
// @Description 创建命名空间(项目)
// @Tags namespace
// @accept json
// @Produce json
// @Param param body Namespace true "json"
// @Success 200
// @Failure 400
// @Router /api/v1/namespace/create [post]
func CreateNamespace(c *gin.Context) {
var param NamespaceParam
if err := c.BindJSON(&param); err != nil {
Response(c, http.StatusBadRequest, "invalid request params.", "")
return
}
// 判断是karmada分发还是指定集群创建
if param.Namespace.Labels != nil && param.Namespace.Labels["jcce"] == "true" {
_, createErr := ClientSet.CoreV1().Namespaces().Create(context.TODO(), &param.Namespace, metav1.CreateOptions{})
if createErr != nil {
Response(c, http.StatusInternalServerError, "failed", createErr)
return
}
// 创建调度策略实例
policyErr := CreatePropagationPolicies(PropagationPolicy{
ClusterName: param.ClusterName,
TemplateId: param.TemplateId,
ResourceName: param.Namespace.Name,
Name: "namespace" + "." + param.Namespace.Name,
Kind: "Namespace",
})
if policyErr != nil {
Response(c, http.StatusInternalServerError, "failed", policyErr)
return
}
} else {
result := PostObject(namespaceConst, param.ClusterName[0], "", "", param.Namespace)
if result.Error() != nil {
Response(c, http.StatusInternalServerError, "failed", result.Error())
return
}
}
Response(c, http.StatusOK, "success", "")
}
// ListNamespaceFromCluster 根据指定集群查询命名空间
func ListNamespaceFromCluster(c *gin.Context) {
clusterName := c.Query("clusterName")
name := c.Query("name")
pageNum := c.Query("pageNum")
pageSize := c.Query("pageSize")
searchObject := SearchObject(namespaceConst, clusterName, "", "")
raw, _ := searchObject.Raw()
var namespaceObject NamespaceObject
json.Unmarshal(raw, &namespaceObject)
var result []coreV1.Namespace
// 模糊查询
if len(name) != 0 {
for _, namespace := range namespaceObject.Items {
if strings.Contains(namespace.Name, name) {
result = append(result, namespace)
}
}
} else {
result = namespaceObject.Items
}
if result == nil {
Response(c, http.StatusOK, "success", nil)
return
}
// 分页
page := &Page[coreV1.Namespace]{}
page.List = result
// 排序
sort.SliceStable(result, func(i, j int) bool {
return result[i].CreationTimestamp.Time.After(result[j].CreationTimestamp.Time)
})
data := Paginator(page, int64(len(result)), pageNum, pageSize)
Response(c, http.StatusOK, "success", data)
}
// ListName 获取所有项目名
func ListName(c *gin.Context) {
clusterName := c.Query("clusterName")
result := SearchObject(namespaceConst, clusterName, "", "")
raw, _ := result.Raw()
var namespaceObject NamespaceObject
json.Unmarshal(raw, &namespaceObject)
var namespaceList []string
for _, namespace := range namespaceObject.Items {
namespaceList = append(namespaceList, namespace.Name)
}
Response(c, http.StatusOK, "success", namespaceList)
}
func ListPodsCount(c *gin.Context) {
var result []NamespacePodCount
namespaceMap := make(map[string]int32)
namespaceDomainMap := make(map[string]bool)
// 查询所有工作负载
deploymentList, err := ClientSet.AppsV1().Deployments("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
Response(c, http.StatusInternalServerError, "failed", err)
return
}
policyList, err := KarmadaClient.PolicyV1alpha1().PropagationPolicies("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
Response(c, http.StatusInternalServerError, "failed", err)
return
}
for _, deployment := range deploymentList.Items {
// 根据命名空间计算pod数量
if _, ok := namespaceMap[deployment.Namespace]; ok {
namespaceMap[deployment.Namespace] = namespaceMap[deployment.Namespace] + deployment.Status.AvailableReplicas
} else {
namespaceMap[deployment.Namespace] = deployment.Status.AvailableReplicas
}
// 根据调度策略实例查询分发到多少个域
for _, policy := range policyList.Items {
if strings.EqualFold(policy.Name, "deployment"+"."+deployment.Namespace+"."+deployment.Name) {
// 拼接查询的集群列表
var domainCount int
query, args, _ := sqlx.In("SELECT COUNT( *) as domainCount from domain_cluster dc WHERE cluster_name in (?)", policy.Spec.Placement.ClusterAffinity.ClusterNames)
DB.QueryRow(query, args...).Scan(&domainCount)
if domainCount > 1 {
namespaceDomainMap[deployment.Namespace] = true
}
}
}
}
for k, v := range namespaceMap {
result = append(result, NamespacePodCount{
Namespace: k,
Count: v,
DomainType: namespaceDomainMap[k]})
}
Response(c, http.StatusOK, "success", result)
}
func IsExistNamespace(c *gin.Context) {
clusterName := c.Query("clusterName")
name := c.Query("name")
result := SearchObject(detailNamespaceConst, clusterName, "", name)
if result.Error() != nil {
Response(c, http.StatusOK, "success", false)
return
}
raw, _ := result.Raw()
var namespace coreV1.Namespace
json.Unmarshal(raw, &namespace)
if len(namespace.Name) == 0 {
Response(c, http.StatusOK, "success", false)
return
}
Response(c, http.StatusOK, "success", true)
}
// ListNamespace 查询Namespace列表
// @Summary 查询Namespace列表
// @Description 查询Namespace列表
// @Tags namespace
// @accept json
// @Produce json
// @Param pageNum query string true "页码 "
// @Param pageSize query string true "每页数量"
// @Success 200
// @Failure 400
// @Router /api/v1/namespace/list [get]
func ListNamespace(c *gin.Context) {
var nameList []string
list, err := ClientSet.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Response(c, http.StatusInternalServerError, "failed", err)
return
}
for _, namespace := range list.Items {
if namespace.Labels["jcce"] == "true" {
nameList = append(nameList, namespace.Name)
}
}
Response(c, http.StatusOK, "success", nameList)
}
func DetailNamespace(c *gin.Context) {
clusterName := c.Query("clusterName")
name := c.Query("name")
result := SearchObject(detailNamespaceConst, clusterName, "", name)
raw, _ := result.Raw()
var namespace coreV1.Namespace
json.Unmarshal(raw, &namespace)
Response(c, http.StatusOK, "success", namespace)
}
// @Summary 查询Namespace详情
// @Description 查询Namespace详情
// @Tags namespace
// @accept json
// @Produce json
// @Param namespace query string true "命名空间"
// @Success 200
// @Failure 400
// @Router /api/v1/namespace/describe [get]
func DescribeNamespace(c *gin.Context) {
namespace := c.Query("namespace")
optsGet := metav1.GetOptions{
TypeMeta: metav1.TypeMeta{},
ResourceVersion: "",
}
nsDescribe, _ := ClientSet.CoreV1().Namespaces().Get(context.TODO(), namespace, optsGet)
//集群工作负载分配到的所有集群和域的集合,这里的数字可以从控制平面来取
deploymentList, err := ClientSet.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return
}
var totalRequired int32
var totalAvailable int32
for _, deploy := range deploymentList.Items {
replicaRequired := *deploy.Spec.Replicas
replicaAvailable := deploy.Status.AvailableReplicas
totalRequired += replicaRequired
totalAvailable += replicaAvailable
}
//通过OpenSearch查询该ns下的分布在不同集群的所有deployment
deploys := GetDeployFromOS(namespace, "", "")
hits, _ := deploys.Get("hits").Get("hits").Array()
//集群详情中包含集群下所有的工作负载列表,所有工作负载所在的集群列表,以及所有集群所在的域列表
var deploySet []string
var clusterSet []string
var domainSet []Domain
for i := 0; i < len(hits); i++ {
clusterName, _ := deploys.Get("hits").Get("hits").GetIndex(i).Get("_source").Get("metadata").Get("annotations").Get("resource.karmada.io/cached-from-cluster").String()
deployName, _ := deploys.Get("hits").Get("hits").GetIndex(i).Get("_source").Get("metadata").Get("name").String()
//通过cluster name从数据库获取 其域名
rows, _ := DB.Query("select dc.domain_id,dc.domain_name,d.longitude,d.latitude from domain_cluster dc,domain d where dc.domain_id = d.domain_id and dc.cluster_name = ?", clusterName)
clusterSet = append(clusterSet, clusterName)
var domainId int32
var domainName string
var longitude float64
var latitude float64
for rows.Next() {
err := rows.Scan(&domainId, &domainName, &longitude, &latitude)
if err != nil {
return
}
}
deploySet = append(deploySet, deployName+":"+clusterName)
/////////
var totalAllocatableCPU int64
var totalAllocatableMemory int64
var totalAllocatedCPU int64
var totalAllocatedMemory int64
var domain Domain
deploys := GetDeployFromOS(namespace, "", clusterName)
hits, _ := deploys.Get("hits").Get("hits").Array()
for i := 0; i < len(hits); i++ {
deployName, _ := deploys.Get("hits").Get("hits").GetIndex(i).Get("_source").Get("metadata").Get("name").String()
domain.Deployments = append(domain.Deployments, deployName)
}
domain.Clusters = append(domain.Clusters, clusterName)
cluster, _, _ := util.GetClusterWithKarmadaClient(KarmadaClient, clusterName)
if cluster == nil {
continue
}
allocatableCPU := cluster.Status.ResourceSummary.Allocatable.Cpu().MilliValue()
allocatableMemory := cluster.Status.ResourceSummary.Allocatable.Memory().MilliValue()
allocatedCPU := cluster.Status.ResourceSummary.Allocated.Cpu().MilliValue()
allocatedMemory := cluster.Status.ResourceSummary.Allocated.Memory().MilliValue()
totalAllocatableCPU += allocatableCPU
totalAllocatableMemory += allocatableMemory
totalAllocatedCPU += allocatedCPU
totalAllocatedMemory += allocatedMemory
domain.DomainId = domainId
domain.DomainName = domainName
domain.Deployments = removeDuplicateArr(domain.Deployments)
domain.Location[0] = longitude
domain.Location[1] = latitude
domain.CPURate = float64(totalAllocatedCPU) / float64(totalAllocatableCPU)
domain.MemoryRate = float64(totalAllocatedMemory) / float64(totalAllocatableMemory)
domainSet = append(domainSet, domain)
}
//域和集群 结果去重
clusterSet = removeDuplicateArr(clusterSet)
domainSet = RemoveRepeatedDomain(domainSet)
nsResult := Namespace{
NsName: nsDescribe.Name,
State: string(nsDescribe.Status.Phase),
Age: strconv.FormatFloat(time.Now().Sub(nsDescribe.CreationTimestamp.Time).Hours()/24, 'f', 0, 64) + "d",
RequirePodNum: totalRequired,
AvailablePodNum: totalAvailable,
Deployments: deploySet,
Domains: domainSet,
Clusters: clusterSet,
}
Response(c, http.StatusOK, "success", nsResult)
}
// UpdateNamespace 更新命名空间(项目)
// @Summary 更新命名空间(项目)
// @Description 更新命名空间(项目)
// @Tags namespace
// @accept json
// @Produce json
// @Param param body Namespace true "json"
// @Success 200
// @Failure 400
// @Router /api/v1/namespace/update [post]
func UpdateNamespace(c *gin.Context) {
clusterName := c.Query("clusterName")
var namespace coreV1.Namespace
if err := c.BindJSON(&namespace); err != nil {
Response(c, http.StatusBadRequest, "invalid request params.", "")
return
}
if len(namespace.Labels["jcce"]) != 0 {
_, err := ClientSet.CoreV1().Namespaces().Update(context.TODO(), &namespace, metav1.UpdateOptions{})
if err != nil {
Response(c, http.StatusInternalServerError, "update namespace failed", err)
return
}
} else {
result := UpdateObject(detailNamespaceConst, clusterName, "", namespace.Name, namespace)
if result.Error() != nil {
Response(c, http.StatusInternalServerError, "update namespace failed", result.Error())
return
}
}
Response(c, http.StatusOK, "success", "")
}
// DeleteNamespace 删除命名空间(项目)
func DeleteNamespace(c *gin.Context) {
clusterName := c.Param("clusterName")
name := c.Param("name")
label := c.Param("label")
if strings.EqualFold(label, "jcce") {
err := ClientSet.CoreV1().Namespaces().Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
Response(c, http.StatusInternalServerError, "delete namespace failed", "")
return
}
// 删除数据库记录policy实例
_, err = DB.Exec(`delete from joint_domain.namespace where namespace_name = ? `, name)
if err != nil {
Response(c, http.StatusInternalServerError, "delete db failed", err)
return
}
// 删除调度策略实例
DeletePropagationPolicies(name, "namespace"+"."+name)
} else {
result := DeleteObject(detailNamespaceConst, clusterName, "", name)
if result.Error() != nil {
Response(c, http.StatusInternalServerError, "delete namespace failed", result.Error())
return
}
}
Response(c, http.StatusOK, "success", "")
}
// @Summary namespace监控
// @Description namespace监控
// @Tags namespace
// @accept json
// @Produce json
// @Param clusterName query string true "集群名"
// @Param namespace query string true "命名空间名称"
// @Success 200
// @Failure 400
// @Router /api/v1/namespace/getMetrics [get]
func GetNamespaceMetrics(c *gin.Context) {
clusterName, _ := c.GetQuery("clusterName")
namespace, _ := c.GetQuery("namespace")
replaceMap := make(map[string]string)
replaceMap[ClusterName] = clusterName
replaceMap["nameSpace"] = namespace
metricMap := map[string][]MetricUrl{
"namespace_memory_usage_wo_cache": {{"nameSpace", namespace, GetMetricUrl(namespace_memory_usage_wo_cache, replaceMap, "nameSpace", metric_range_12h, steps_namespace)}},
"namespace_cpu_usage": {{"nameSpace", namespace, GetMetricUrl(namespace_cpu_usage, replaceMap, "nameSpace", metric_range_12h, steps_namespace)}},
}
ch := make(chan MetricResult, len(metricMap))
var wg sync.WaitGroup
for k, v := range metricMap {
wg.Add(1)
go HttpGetMetrics(k, v, &wg, ch)
}
wg.Wait()
close(ch)
mr := []MetricResult{}
for v := range ch {
mr = append(mr, v)
}
Response(c, http.StatusOK, "success", mr)
}
// @Summary 批量获取namespace监控
// @Description 批量获取namespace监控
// @Tags namespace
// @accept json
// @Produce json
// @Param clusterName query string true "集群名"
// @Param namespaces query string true "逗号分隔多个命名空间名称"
// @Success 200
// @Failure 400
// @Router /api/v1/namespace/getBatchMetrics [get]
func GetBatchNamespaceMetrics(c *gin.Context) {
clusterName := c.Query("clusterName")
namespaces := c.Query("namespaces")
namespace := strings.Split(namespaces, ",")
metricUrls := make([][]MetricUrl, 3)
for _, name := range namespace {
imap := map[string]string{
ClusterName: clusterName,
"nameSpace": name,
}
metricUrls[0] = append(metricUrls[0], MetricUrl{"nameSpace", name, GetMetricUrl(namespace_memory_usage_wo_cache, imap, "nameSpace", metric_range_1s, steps_1s)})
metricUrls[1] = append(metricUrls[1], MetricUrl{"nameSpace", name, GetMetricUrl(namespace_cpu_usage, imap, "nameSpace", metric_range_1s, steps_1s)})
metricUrls[2] = append(metricUrls[2], MetricUrl{"nameSpace", name, GetMetricUrl(namespace_pod_count, imap, "nameSpace", metric_range_1s, steps_1s)})
}
metricMap := map[string][]MetricUrl{
"namespace_memory_usage_wo_cache": metricUrls[0],
"namespace_cpu_usage": metricUrls[1],
"namespace_pod_count": metricUrls[2],
}
ch := make(chan MetricResult, len(metricMap))
var wg sync.WaitGroup
for k, v := range metricMap {
wg.Add(1)
go HttpGetMetrics(k, v, &wg, ch)
}
wg.Wait()
close(ch)
mr := []MetricResult{}
for v := range ch {
mr = append(mr, v)
}
Response(c, http.StatusOK, "success", mr)
}
func CreateNamespaceByYaml(c *gin.Context) {
file, err := c.FormFile("test")
if err != nil {
return
}
open, err := file.Open()
if err != nil {
return
}
all, err := io.ReadAll(open)
if err != nil {
return
}
var namespace coreV1.Namespace
yaml.Unmarshal(all, &namespace)
if err != nil {
return
}
result := PostObject(namespaceConst, "host", "", "", namespace)
Response(c, http.StatusOK, "success", result)
}