From 329066fbdde2e59418fc971688be789278fe8063 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 8 Nov 2023 16:13:08 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etc/kubernetes.yaml | 29 ++++++++++------- internal/cron/cron.go | 2 +- internal/cron/taskcron.go | 68 ++++++++++++++++++++++----------------- kubernetes.go | 59 --------------------------------- 4 files changed, 56 insertions(+), 102 deletions(-) diff --git a/etc/kubernetes.yaml b/etc/kubernetes.yaml index 3b2ae82..dc72ce3 100644 --- a/etc/kubernetes.yaml +++ b/etc/kubernetes.yaml @@ -1,25 +1,30 @@ Name: pcm.kubenative.rpc -ListenOn: 0.0.0.0:2003 +ListenOn: 0.0.0.0:2005 Participant: - Name: 内网15.3 - Address: 10.101.15.3:6443 - RpcAddress: 0.0.0.0:2003 - Type: "0" - TenantId: 1 - TenantName: nudt-pdl - Labels: {"nudt":"hpc"} - MetricsUrl: http://10.101.15.3:32585 + Name: ali + Address: http://121.89.194.135:6443 + RpcAddress: 0.0.0.0:2005 + Type: "CLOUD" + TenantId: 3 + TenantName: ali + Labels: { "cloud": "ali" } + MetricsUrl: http://121.89.194.135:31965 KubeConfig: - Server: 10.101.15.3:6443 - Token: eyJhbGciOiJSUzI1NiIsImtpZCI6IllCbUxDUnBlNG1FWnh2aVRpcHpHdU1WT1ViZEpkRVBJcFVHcjJlRUk5WUEifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJrdWJlLW5hdGl2ZS10b2tlbi05OTRjMiIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJrdWJlLW5hdGl2ZSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjdkZWY5MmFmLTU4ZDAtNGQwMy04M2ZmLTgxYTIzYWQzNDY3ZCIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprdWJlLXN5c3RlbTprdWJlLW5hdGl2ZSJ9.EI6ZEBEZysdxQHe4_I6eu4gTKymgZxuOAE6CSjK4JT0jXr2CLuXW0V9Nw3FZjEfWR7F2fEVBEnFkOOqvsfWIWOwduATV8t4LfZwCzBJZArTOw4AvV-WsYiyaJXQhx0AnowSJTZyY55ZuSenUFf6ZrDhvlkUGkTNU9fmNMzWpcjwcYKrI_ZoI9pu0mXtwr0RdjvklQqaR1bGI7m1j6uWir2NZrvgJt0-kdsFyRtAzZjiCSHtf2YO6Cm5-w2zPJp-SoNvaO97cJy_wjGIrUHrP-CwqZ0m_-SzZ23Z-sH91U1zGrKD6dso-5-qOArPODrRr-GNG5kTrKkLyRnsAOm7vvw + Server: 121.89.194.135:6443 + Token: eyJhbGciOiJSUzI1NiIsImtpZCI6IjdyeU1HdUxNYTVxLWRzMWY1UGdOV1pjVnQxUXRETXRXZzJyczlIeHVtLXMifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJwY20tdGVzdC10b2tlbi1mNjhjdCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJwY20tdGVzdCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImEzZTM3Zjc4LTFmNmQtNDQzZC1hY2VkLWU2NjFiZGRmNDQ1NCIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprdWJlLXN5c3RlbTpwY20tdGVzdCJ9.EmoQJuwXAtgJ5c_CtyMxe2AtWei67i5Ljb3Fw6qGrCmy67Yn0Nhh31Pdwdo0yjJ7z-jg2LrYH9pZoH1EtcqIOBtJahkdzPM87TuSoZe0AdEbI2HaKNuPs_d81v9jhDAwYYV_kfbtFmbq3CtibPqIhqu4GAdWvJYXWN3_XzUrXP6tLcBkQLxnMyAWnwTPt7aX2bM0_peect4a-W51IrQbeNkpW6LCA6-_LFKo-sAwit95H8ZsRVcgM7Zz5sNoqB00PafLsOoJCulfo9n_AEay6jTDhOHmQfJsRa7VbIHIM4XOXO8P8IV_63Dm0Ru8Um8C5Dhssf3pMuldg6zjN9MyNw + + + # core rpc PcmCoreRpcConf: Endpoints: - 119.45.100.73:32456 +# Endpoints: +# - 0.0.0.0:2004 NonBlock: true -ParticipantId: 1713882658292895744 \ No newline at end of file +ParticipantId: 1696118513460056064 \ No newline at end of file diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 5b24099..d6d6b80 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -18,6 +18,6 @@ func AddCronGroup(svc *svc.ServiceContext) { // 推送节点动态信息 svc.Cron.AddFunc("0 0/2 * * * ?", func() { - tracker.NodesDynamicInfo(svc.Config.MetricsUrl, svc.ParticipantRpc, svc.ClientSet) + tracker.NodesDynamicInfo(svc.Config.Participant.MetricsUrl, svc.ParticipantRpc, svc.ClientSet) }) } diff --git a/internal/cron/taskcron.go b/internal/cron/taskcron.go index 8168e72..47c727a 100644 --- a/internal/cron/taskcron.go +++ b/internal/cron/taskcron.go @@ -15,11 +15,6 @@ import ( ) func SyncTask(svc *svc.ServiceContext) { - - applyYamlLogic := logic.NewApplyYamlLogic(context.Background(), svc) - - deleteYamlLogic := logic.NewDeleteYamlLogic(context.Background(), svc) - participantId, err := tool.GetParticipantId("etc/kubernetes.yaml") if err != nil { return @@ -33,35 +28,19 @@ func SyncTask(svc *svc.ServiceContext) { logx.Error(err) return } + // 遍历执行任务操作 for index, _ := range infoList.CloudInfoList { // 删除任务 if infoList.CloudInfoList[index].Status == "WaitDelete" { - deleteReq := kubernetesclient.ApplyReq{ - YamlString: infoList.CloudInfoList[index].YamlString, - } - _, err := deleteYamlLogic.DeleteYaml(&deleteReq) - if err != nil { - infoList.CloudInfoList[index].Status = "DeleteError" - infoList.CloudInfoList[index].Result = err.Error() - continue - } - infoList.CloudInfoList[index].Status = "Deleted" + delete(infoList.CloudInfoList[index], svc) } + // 执行任务 if infoList.CloudInfoList[index].Status == "Saved" { - // 提交任务 - applyReq := kubernetesclient.ApplyReq{ - YamlString: infoList.CloudInfoList[index].YamlString, - } - infoList.CloudInfoList[index].Status = "Pending" - _, err := applyYamlLogic.ApplyYaml(&applyReq) - if err != nil { - infoList.CloudInfoList[index].Status = "Failed" - infoList.CloudInfoList[index].Result = err.Error() - } + apply(infoList.CloudInfoList[index], svc) } } - // 遍历任务信息 + // 遍历查询任务信息 for index, _ := range infoList.CloudInfoList { if infoList.CloudInfoList[index].Kind == "Deployment" { DeploymentHandler(infoList.CloudInfoList[index], svc) @@ -70,9 +49,8 @@ func SyncTask(svc *svc.ServiceContext) { JobHandler(infoList.CloudInfoList[index], svc) } } - + // 同步信息到core端 if len(infoList.CloudInfoList) != 0 { - // 同步信息到core端 SyncInfoReq := pcmCore.SyncInfoReq{ ParticipantId: participantId, CloudInfoList: infoList.CloudInfoList, @@ -85,6 +63,36 @@ func SyncTask(svc *svc.ServiceContext) { } } +// 删除资源 +func delete(cloud *pcmCore.CloudInfo, svc *svc.ServiceContext) { + deleteYamlLogic := logic.NewDeleteYamlLogic(context.Background(), svc) + deleteReq := kubernetesclient.ApplyReq{ + YamlString: cloud.YamlString, + } + _, err := deleteYamlLogic.DeleteYaml(&deleteReq) + if err != nil { + cloud.Status = "DeleteError" + cloud.Result = err.Error() + return + } + cloud.Status = "Deleted" +} + +// 执行资源 +func apply(cloud *pcmCore.CloudInfo, svc *svc.ServiceContext) { + applyYamlLogic := logic.NewApplyYamlLogic(context.Background(), svc) + // 提交任务 + applyReq := kubernetesclient.ApplyReq{ + YamlString: cloud.YamlString, + } + cloud.Status = "Pending" + _, err := applyYamlLogic.ApplyYaml(&applyReq) + if err != nil { + cloud.Status = "Failed" + cloud.Result = err.Error() + } +} + func DeploymentHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) { // 遍历core端任务列表信息 deploymentList, err := svc.ClientSet.AppsV1().Deployments("").List(context.Background(), metav1.ListOptions{}) @@ -125,11 +133,11 @@ func JobHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) { for _, item := range job.Status.Conditions { if item.Type == "Failed" && item.Status == "True" { cloudInfo.Status = "Failed" - continue + return } if item.Type == "Complete" && item.Status == "True" { cloudInfo.Status = "Completed" - continue + return } } } diff --git a/kubernetes.go b/kubernetes.go index 6273af5..9d2968d 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -87,62 +87,3 @@ func PushParticipantInfo(config config.Config, participantService participantser // 更新本地配置文件ParticipantId tool.UpdateParticipantId(*configFile, strconv.FormatInt(resp.ParticipantId, 10)) } - -//func SetupInformer(clientSet *k8s.Clientset) (cache.SharedIndexInformer, cache.SharedIndexInformer, cache.SharedIndexInformer, error) { -// informerFactory := informers.NewSharedInformerFactory(clientSet, time.Second*30) -// -// jobInformer := informerFactory.Batch().V1().Jobs().Informer() -// deploymentInformer := informerFactory.Apps().V1().Deployments().Informer() -// podInformer := informerFactory.Core().V1().Pods().Informer() -// -// return jobInformer, deploymentInformer, podInformer, nil -//} -// -//func Operate(clientSet *k8s.Clientset) { -// jobInformer, deploymentInformer, podInformer, err := SetupInformer(clientSet) -// if err != nil { -// return -// } -// stopCh := make(chan struct{}) -// defer close(stopCh) -// -// go jobInformer.Run(stopCh) -// go deploymentInformer.Run(stopCh) -// go podInformer.Run(stopCh) -// if !cache.WaitForCacheSync(stopCh, jobInformer.HasSynced, deploymentInformer.HasSynced) { -// panic("Timed out waiting for caches to sync") -// } -// -// podInformer.han(cache.ResourceEventHandlerFuncs{ -// AddFunc: func(obj interface{}) { -// job := obj.(*v1.Pod) -// println(job.Name) -// }, -// UpdateFunc: func(oldObj, newObj interface{}) { -// job := newObj.(*v12.Job) -// println(job.Name) -// }, -// }) -// -// jobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ -// AddFunc: func(obj interface{}) { -// job := obj.(*v12.Job) -// println(job.Name) -// }, -// UpdateFunc: func(oldObj, newObj interface{}) { -// job := newObj.(*v12.Job) -// println(job.Name) -// }, -// }) -// -// deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ -// AddFunc: func(obj interface{}) { -// deployment := obj.(*v1.Deployment) -// println(deployment.Name) -// }, -// UpdateFunc: func(oldObj, newObj interface{}) { -// deployment := newObj.(*v1.Deployment) -// println(deployment.Name) -// }, -// }) -//}