This commit is contained in:
zhangwei 2024-04-18 16:14:30 +08:00
parent d2919cf2d6
commit 2b1496e501
8 changed files with 136 additions and 37 deletions

View File

@ -1,5 +1,5 @@
name: pcm-kubernetes
port: 2005
port: 8082
adapterId: 1770658294298316800

View File

@ -2,9 +2,11 @@ package main
import (
"github.com/robfig/cron/v3"
"jcc-schedule/etc"
"jcc-schedule/pkg/apiserver"
internalCron "jcc-schedule/pkg/cron"
"jcc-schedule/routers"
"strconv"
)
// @title jcc调度中心
@ -17,5 +19,5 @@ func main() {
c.Start()
internalCron.AddCronGroup(c)
_ = router.Run(":8082")
_ = router.Run(":" + strconv.Itoa(etc.ServerConf.Port))
}

View File

@ -9,8 +9,8 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"strconv"
"time"
)
type APIServer struct {
@ -75,8 +75,8 @@ func (s *APIServer) installK8sClient() {
for _, cluster := range s.Clusters {
if len(cluster.Server) != 0 && len(cluster.BearerToken) != 0 {
restConfig := &rest.Config{
Timeout: 10 * time.Second,
Host: cluster.Server,
RateLimiter: flowcontrol.NewTokenBucketRateLimiter(1000, 1000),
BearerToken: cluster.BearerToken,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,

View File

@ -5,7 +5,11 @@ import (
)
func AddCronGroup(c *cron.Cron) {
c.AddFunc("0/2 * * * * ?", func() {
c.AddFunc("0/15 * * * * ?", func() {
syncClusterLoadRecords()
})
c.AddFunc("0/15 * * * * ?", func() {
syncClusterAlertRecords()
})
}

View File

@ -1,17 +1,20 @@
package cron
import (
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/zeromicro/go-zero/core/logx"
"jcc-schedule/etc"
"jcc-schedule/pkg/apiserver"
"jcc-schedule/pkg/monitoring"
v1 "jcc-schedule/routers/api/v1"
"sync"
)
type SyncClusterLoadReq struct {
ClusterLoadRecords []ClusterLoadRecord `json:"clusterLoadRecords"`
}
type ClusterLoadRecord struct {
AdapterId int64 `json:"adapterId"`
ClusterName string `json:"clusterName"`
@ -24,6 +27,47 @@ type ClusterLoadRecord struct {
DiskAvail float64 `json:"diskAvail"`
DiskTotal float64 `json:"diskTotal"`
DiskUtilisation float64 `json:"diskUtilisation"`
PodsUtilisation float64 `json:"podsUtilisation"`
}
type SyncClusterAlertReq struct {
AlertRecordsMap map[string][]*promv1.Alert `json:"alertRecordsMap"`
}
func syncClusterAlertRecords() {
res := make(map[string][]*promv1.Alert)
var wg sync.WaitGroup
for k, v := range apiserver.ApiServer.MonitoringClientMap {
wg.Add(1)
go func(key string, client monitoring.Interface) {
defer wg.Done()
rules, err := client.GetExceptionRules()
if err != nil {
return
}
res[k] = rules
}(k, v)
}
wg.Wait()
// push data
req := SyncClusterAlertReq{
AlertRecordsMap: res,
}
resp, err := apiserver.ApiServer.HttpClient.R().
SetBody(&req).
ForceContentType("application/json").
Post(etc.ServerConf.CoreServerUrl + "/pcm/v1/core/syncClusterAlert")
if err != nil {
logx.Error(err)
}
if resp.StatusCode() != 200 {
logx.Error(resp.String())
}
}
func syncClusterLoadRecords() {
@ -38,11 +82,21 @@ func syncClusterLoadRecords() {
wg.Add(1)
go func(clusterName string, client monitoring.Interface) {
defer wg.Done()
// prometheus data
clusterResourceLoad, err := fetchClusterResourceLoad(clusterName, client)
if err != nil {
logx.Error(err)
return
}
// pod utilisation
if _, ok := apiserver.ApiServer.ClientSetMap[clusterName]; ok {
podUtilisation, err := v1.PodUtilisation(apiserver.ApiServer.ClientSetMap[clusterName])
if err != nil {
logx.Error(err)
}
clusterResourceLoad.PodsUtilisation = podUtilisation
}
res = append(res, clusterResourceLoad)
}(clusterName, client)
@ -64,67 +118,74 @@ func syncClusterLoadRecords() {
}
}
func fetchClusterResourceLoad(clusterName string, client monitoring.Interface) (ClusterLoadRecord, error) {
func fetchClusterResourceLoad(clusterName string, promClient monitoring.Interface) (ClusterLoadRecord, error) {
clusterResourceLoad := ClusterLoadRecord{
AdapterId: etc.ServerConf.AdapterId,
ClusterName: clusterName,
}
// cpu utilisation
cpuUtilisationValue, err := client.GetRawData("cluster_cpu_utilisation")
cpuUtilisationValue, err := promClient.GetRawData("cluster_cpu_utilisation")
if err != nil {
return clusterResourceLoad, err
}
cpuUtilisationData, _ := cpuUtilisationValue.(model.Vector)
clusterResourceLoad.CpuUtilisation = float64(cpuUtilisationData[0].Value)
cpuUtilisationData, ok := cpuUtilisationValue.(model.Vector)
if ok && cpuUtilisationData.Len() != 0 {
clusterResourceLoad.CpuUtilisation = float64(cpuUtilisationData[0].Value)
}
// cpu total
cpuTotalValue, err := client.GetRawData("cluster_cpu_total")
cpuTotalValue, err := promClient.GetRawData("cluster_cpu_total")
if err != nil {
return clusterResourceLoad, err
}
cpuTotalData, _ := cpuTotalValue.(model.Vector)
clusterResourceLoad.CpuTotal = float64(cpuTotalData[0].Value)
cpuTotalData, ok := cpuTotalValue.(model.Vector)
if ok && cpuTotalData.Len() != 0 {
clusterResourceLoad.CpuTotal = float64(cpuTotalData[0].Value)
}
// cpu available
clusterResourceLoad.CpuAvail = clusterResourceLoad.CpuTotal - float64(cpuUtilisationData[0].Value)*clusterResourceLoad.CpuTotal
// memory available
memAvailValue, err := client.GetRawData("cluster_memory_avail")
memAvailValue, err := promClient.GetRawData("cluster_memory_avail")
if err != nil {
return clusterResourceLoad, err
}
memAvailData, _ := memAvailValue.(model.Vector)
clusterResourceLoad.MemoryAvail = float64(memAvailData[0].Value)
memAvailData, ok := memAvailValue.(model.Vector)
if ok && memAvailData.Len() != 0 {
clusterResourceLoad.MemoryAvail = float64(memAvailData[0].Value)
}
// memory total
memTotalValue, err := client.GetRawData("cluster_memory_total")
memTotalValue, err := promClient.GetRawData("cluster_memory_total")
if err != nil {
return clusterResourceLoad, err
}
memTotalData, _ := memTotalValue.(model.Vector)
clusterResourceLoad.MemoryTotal = float64(memTotalData[0].Value)
memTotalData, ok := memTotalValue.(model.Vector)
if ok && memTotalData.Len() != 0 {
clusterResourceLoad.MemoryTotal = float64(memTotalData[0].Value)
}
// memory utilisation
clusterResourceLoad.MemoryUtilisation = (clusterResourceLoad.MemoryTotal - clusterResourceLoad.MemoryAvail) / clusterResourceLoad.MemoryTotal
// disk avail
diskAvailValue, err := client.GetRawData("cluster_disk_avail")
diskAvailValue, err := promClient.GetRawData("cluster_disk_avail")
if err != nil {
return clusterResourceLoad, err
}
diskAvailData, _ := diskAvailValue.(model.Vector)
clusterResourceLoad.DiskAvail = float64(diskAvailData[0].Value)
diskAvailData, ok := diskAvailValue.(model.Vector)
if ok && diskAvailData.Len() != 0 {
clusterResourceLoad.DiskAvail = float64(diskAvailData[0].Value)
}
// disk total
diskTotalValue, err := client.GetRawData("cluster_disk_total")
diskTotalValue, err := promClient.GetRawData("cluster_disk_total")
if err != nil {
return clusterResourceLoad, err
}
diskTotalData, _ := diskTotalValue.(model.Vector)
clusterResourceLoad.DiskTotal = float64(diskTotalData[0].Value)
diskTotalData, ok := diskTotalValue.(model.Vector)
if ok && diskTotalData.Len() != 0 {
clusterResourceLoad.DiskTotal = float64(diskTotalData[0].Value)
}
// disk utilisation
clusterResourceLoad.DiskUtilisation = (clusterResourceLoad.DiskTotal - clusterResourceLoad.DiskAvail) / clusterResourceLoad.DiskTotal
return clusterResourceLoad, nil
}

View File

@ -37,6 +37,7 @@ func ParseTime(timestamp string) (time.Time, error) {
}
func (p Prometheus) GetExceptionRules() ([]*v1.Alert, error) {
rules, err := p.client.Rules(context.Background())
if err != nil {
return nil, err

View File

@ -132,21 +132,20 @@ func AlertRuleList(c *gin.Context) {
return
}
res := make(map[string][]*v1.Alert)
var mtx sync.Mutex
var wg sync.WaitGroup
for k, v := range apiserver.ApiServer.MonitoringClientMap {
wg.Add(1)
go func() {
go func(key string, client monitoring.Interface) {
defer wg.Done()
rules, err := v.GetExceptionRules()
rules, err := client.GetExceptionRules()
if err != nil {
return
}
mtx.Lock()
res[k] = rules
mtx.Unlock()
}()
}(k, v)
}
wg.Wait()
Response(c, http.StatusOK, "success", res)

View File

@ -1,5 +1,37 @@
package v1
func NodePod() {
import (
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"time"
)
func PodUtilisation(client *kubernetes.Clientset) (Utilisation float64, err error) {
start := time.Now()
var maxPods int64
nodeList, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
logx.Error(err, "Failed to get node list")
return float64(0), err
}
if len(nodeList.Items) != 0 {
for _, node := range nodeList.Items {
maxPods = maxPods + node.Status.Capacity.Pods().Value()
}
}
podList, err := client.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
if err != nil {
logx.Error(err, "Failed to get pod list")
return float64(0), err
}
if len(podList.Items) == 0 {
return float64(0), nil
}
end := time.Now()
duration := end.Sub(start)
fmt.Printf("请求耗时: %s\n", duration)
return float64(len(podList.Items)) / float64(maxPods), nil
}