Merge pull request 'updated createdeploytask logics' (#316) from tzwang/pcm-coordinator:master into master

This commit is contained in:
tzwang 2024-09-23 10:53:32 +08:00
commit f1f1697efe
3 changed files with 84 additions and 10 deletions

View File

@ -3,8 +3,10 @@ package inference
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"strconv"
"sync"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@ -40,19 +42,78 @@ func (l *CreateDeployTaskLogic) CreateDeployTask(req *types.CreateDeployTaskReq)
Cmd: "",
}
duplicated, err := l.svcCtx.Scheduler.AiStorages.IsDeployTaskNameDuplicated(req.TaskName)
if err != nil {
return nil, err
}
if duplicated {
return nil, errors.New("TaskName doesn't exist")
}
taskId, err := l.svcCtx.Scheduler.AiStorages.SaveInferDeployTask(req.TaskName, req.ModelName, req.ModelType, req.TaskDesc)
if err != nil {
return nil, err
}
var clusterlen int
for _, c := range req.AdapterClusterMap {
clusterlen += len(c)
}
var errCh = make(chan interface{}, clusterlen)
var errs []interface{}
buf := make(chan bool, 2)
var wg sync.WaitGroup
for aid, v := range req.AdapterClusterMap {
for _, cid := range v {
err = l.createDeployInstance(taskId, aid, cid, opt)
if err != nil {
return nil, err
}
for _, c := range v {
wg.Add(1)
cid := c
buf <- true
go func() {
err = l.createDeployInstance(taskId, aid, cid, opt)
if err != nil {
e := struct {
err error
clusterId string
}{
err: err,
clusterId: cid,
}
errCh <- e
wg.Done()
<-buf
return
}
wg.Done()
<-buf
}()
}
}
wg.Wait()
close(errCh)
for e := range errCh {
errs = append(errs, e)
}
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
clusterName, err := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(e.clusterId)
if err != nil {
clusterName = e.clusterId
}
msg += fmt.Sprintf("CreateInstance Failed # clusterName: %v, error: %v \n", clusterName, e.err.Error())
}
return nil, errors.New(msg)
}
return
}

View File

@ -103,11 +103,11 @@ func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeplo
return nil, errors.New(err.Error())
}
if len(list) == 0 {
err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id)
if err != nil {
logx.Errorf("db DeleteByDeployTaskId error")
return nil, errors.New(err.Error())
}
//err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id)
//if err != nil {
// logx.Errorf("db DeleteByDeployTaskId error")
// return nil, errors.New(err.Error())
//}
continue
}
deployTask := &DeployTask{

View File

@ -604,3 +604,16 @@ func (s *AiStorage) GetRunningDeployInstanceById(id int64, adapterId string) ([]
}
return list, nil
}
func (s *AiStorage) IsDeployTaskNameDuplicated(name string) (bool, error) {
var total int32
tx := s.DbEngin.Raw("select count(*) from ai_deploy_instance_task where `name` = ?", name).Scan(&total)
if tx.Error != nil {
return false, tx.Error
}
if total == 0 {
return false, nil
}
return true, nil
}