This commit is contained in:
jagger 2024-05-06 15:06:42 +08:00
parent 009396a09d
commit e66f11ea99
3 changed files with 44 additions and 16 deletions

View File

@ -14,6 +14,6 @@ func AddCronGroup(c *cron.Cron) {
})
c.AddFunc("0/5 * * * * ?", func() {
PullTaskInfo()
SyncTaskInfo()
})
}

View File

@ -3,22 +3,22 @@ package cron
import (
"encoding/json"
"github.com/go-resty/resty/v2"
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"go.uber.org/zap"
"jcc-schedule/global"
"jcc-schedule/service/base"
"strconv"
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"time"
)
func PullTaskInfo() {
func SyncTaskInfo() {
httpClient := resty.New().R()
result, err := httpClient.SetHeader("Content-Type", "application/json").
SetQueryParam("adapterId", strconv.FormatInt(global.PCM_CONFIG.System.AdapterId, 10)).
Get(global.PCM_CONFIG.System.CoreServerUrl + "/pcm/v1/core/pullTaskInfo")
if err != nil {
global.PCM_LOG.Error("PullTaskInfo error :", zap.Error(err))
global.PCM_LOG.Error("SyncTaskInfo error :", zap.Error(err))
}
var resp coreClient.PullTaskInfoResp
@ -27,12 +27,36 @@ func PullTaskInfo() {
return
}
var sys = new(base.OperateStruct)
now := time.Now()
if resp.CloudInfoList != nil && len(resp.CloudInfoList) != 0 {
for _, cloudInfoList := range resp.CloudInfoList {
sys.YamlString = cloudInfoList.YamlString
sys.ClusterName = cloudInfoList.ClusterName
sys.Apply()
//TODO 更新任务状态
if cloudInfoList.Status == "Pending" {
cloudInfoList.StartTime = &now
sys.YamlString = cloudInfoList.YamlString
sys.ClusterName = cloudInfoList.ClusterName
err = sys.Apply()
if err != nil {
return
}
cloudInfoList.Status = "Running"
// push submitted mark to coordinator
pushTaskInfo(resp, err, httpClient)
}
}
}
}
func pushTaskInfo(resp coreClient.PullTaskInfoResp, err error, httpClient *resty.Request) {
PushReq := coreClient.PushTaskInfoReq{
AdapterId: global.PCM_CONFIG.System.AdapterId,
CloudInfoList: resp.CloudInfoList,
}
pushResult := coreClient.PushTaskInfoResp{}
_, err = httpClient.SetHeader("Content-Type", "application/json").
SetBody(&PushReq).
SetResult(&pushResult).
Post(global.PCM_CONFIG.System.CoreServerUrl + "/pcm/v1/core/pushTaskInfo")
if err != nil {
global.PCM_LOG.Error("SyncTaskInfo pushTaskInfo error :", zap.Error(err))
}
}

View File

@ -24,7 +24,7 @@ type OperateStruct struct {
YamlString string `json:"yamlString"`
}
func (p *OperateStruct) Apply() {
func (p *OperateStruct) Apply() error {
d := yaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(p.YamlString), 4096)
var err error
for {
@ -35,7 +35,7 @@ func (p *OperateStruct) Apply() {
}
if err != nil {
global.PCM_LOG.Error("Decode err", zap.Error(err))
return
return err
}
obj := &unstructured.Unstructured{}
syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
@ -43,19 +43,19 @@ func (p *OperateStruct) Apply() {
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
global.PCM_LOG.Error("DefaultUnstructuredConverter err", zap.Error(err))
return
return err
}
unStructureObj := &unstructured.Unstructured{Object: unstructuredMap}
gvr, err := GetGVR(apiserver.ApiServer.ClientSetMap[p.ClusterName], unStructureObj.GroupVersionKind())
if err != nil {
global.PCM_LOG.Error("GetGVR err", zap.Error(err))
return
return err
}
unstructuredYaml, err := sigyaml.Marshal(unStructureObj)
if err != nil {
global.PCM_LOG.Error("Marshal err", zap.Error(err))
return
return err
}
if unStructureObj.GetNamespace() == "" {
@ -66,7 +66,7 @@ func (p *OperateStruct) Apply() {
_, createErr := apiserver.ApiServer.DynamicClientMap[p.ClusterName].Resource(gvr).Namespace(unStructureObj.GetNamespace()).Create(context.Background(), unStructureObj, metav1.CreateOptions{})
if createErr != nil {
global.PCM_LOG.Error("createErr err", zap.Error(createErr))
return
return createErr
}
}
@ -82,9 +82,13 @@ func (p *OperateStruct) Apply() {
})
if err != nil {
global.PCM_LOG.Error("err", zap.Error(err))
return
return err
}
}
if err != nil {
return err
}
return nil
}
// GetGVR 获取GVR