diff --git a/internal/logic/inference/createdeploytasklogic.go b/internal/logic/inference/createdeploytasklogic.go index 27f561bc6..bb1b4e0cc 100644 --- a/internal/logic/inference/createdeploytasklogic.go +++ b/internal/logic/inference/createdeploytasklogic.go @@ -3,8 +3,10 @@ package inference import ( "context" "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "strconv" + "sync" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -40,19 +42,78 @@ func (l *CreateDeployTaskLogic) CreateDeployTask(req *types.CreateDeployTaskReq) Cmd: "", } + duplicated, err := l.svcCtx.Scheduler.AiStorages.IsDeployTaskNameDuplicated(req.TaskName) + if err != nil { + return nil, err + } + if duplicated { + return nil, errors.New("TaskName doesn't exist") + } + taskId, err := l.svcCtx.Scheduler.AiStorages.SaveInferDeployTask(req.TaskName, req.ModelName, req.ModelType, req.TaskDesc) if err != nil { return nil, err } + var clusterlen int + for _, c := range req.AdapterClusterMap { + clusterlen += len(c) + } + var errCh = make(chan interface{}, clusterlen) + var errs []interface{} + + buf := make(chan bool, 2) + var wg sync.WaitGroup for aid, v := range req.AdapterClusterMap { - for _, cid := range v { - err = l.createDeployInstance(taskId, aid, cid, opt) - if err != nil { - return nil, err - } + for _, c := range v { + wg.Add(1) + cid := c + buf <- true + go func() { + err = l.createDeployInstance(taskId, aid, cid, opt) + if err != nil { + e := struct { + err error + clusterId string + }{ + err: err, + clusterId: cid, + } + errCh <- e + wg.Done() + <-buf + return + } + wg.Done() + <-buf + }() } } + wg.Wait() + close(errCh) + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + var msg string + for _, err := range errs { + e := (err).(struct { + err error + clusterId string + }) + + clusterName, err := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(e.clusterId) + if err != nil { + clusterName = e.clusterId + } + + msg += fmt.Sprintf("CreateInstance Failed # clusterName: %v, error: %v \n", clusterName, e.err.Error()) + + } + return nil, errors.New(msg) + } return } diff --git a/internal/logic/inference/deployinstancelistlogic.go b/internal/logic/inference/deployinstancelistlogic.go index 794673cf3..86b378dfe 100644 --- a/internal/logic/inference/deployinstancelistlogic.go +++ b/internal/logic/inference/deployinstancelistlogic.go @@ -103,11 +103,11 @@ func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeplo return nil, errors.New(err.Error()) } if len(list) == 0 { - err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id) - if err != nil { - logx.Errorf("db DeleteByDeployTaskId error") - return nil, errors.New(err.Error()) - } + //err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id) + //if err != nil { + // logx.Errorf("db DeleteByDeployTaskId error") + // return nil, errors.New(err.Error()) + //} continue } deployTask := &DeployTask{ diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index 3cc70c7b0..74b5b0bc1 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -604,3 +604,16 @@ func (s *AiStorage) GetRunningDeployInstanceById(id int64, adapterId string) ([] } return list, nil } + +func (s *AiStorage) IsDeployTaskNameDuplicated(name string) (bool, error) { + var total int32 + tx := s.DbEngin.Raw("select count(*) from ai_deploy_instance_task where `name` = ?", name).Scan(&total) + if tx.Error != nil { + return false, tx.Error + } + if total == 0 { + return false, nil + } + + return true, nil +}