Merge remote-tracking branch 'origin/2.0' into 2.0_lee_2.0

# Conflicts:
#	adaptor/PCM-HPC/PCM-TH/rpc/etc/hpcth.yaml
This commit is contained in:
ll15074821352 2023-04-23 15:33:26 +08:00
commit aacd2deeba
63 changed files with 3171 additions and 1890 deletions

View File

@ -2,7 +2,9 @@ NacosConfig:
DataId: pcm-hanwuji-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: 10.101.15.7
Port: 8848
- IpAddr: nacos-headless
Port: 8848

View File

@ -2,7 +2,9 @@ NacosConfig:
DataId: pcm-modelarts-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: 10.101.15.7
Port: 8848
- IpAddr: nacos-headless
Port: 8848

View File

@ -0,0 +1,130 @@
package logic
import (
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/internal/svc"
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/modelarts"
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"context"
"github.com/zeromicro/go-zero/core/logx"
"time"
)
func InitCron(svc *svc.ServiceContext) {
submitJobLogic := NewCreateTrainingJobLogic(context.Background(), svc)
listLogic := NewGetListTrainingJobsLogic(context.Background(), svc)
svc.Cron.AddFunc("*/5 * * * * ?", func() {
syncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "ai",
ServiceName: "modelArts",
}
// 查询core端分发下来的任务列表
infoList, err := queryCoreInfoList(svc)
if err != nil {
logx.Error(err)
return
}
// 提交任务
submitJob(infoList, submitJobLogic)
// 查询运行中的任务列表同步信息
listReq := modelarts.ListTrainingJobsreq{
ProjectId: "0a62ffb0d48026c12fbfc011b8d23f0b",
Limit: 10,
OffSet: 0,
}
listJob, err := listLogic.GetListTrainingJobs(&listReq)
if err != nil {
logx.Error(err)
return
}
for index, _ := range infoList.AiInfoList {
for _, job := range listJob.Items {
if job.Metadata.Name == infoList.AiInfoList[index].Name {
infoList.AiInfoList[index].ProjectId = job.ProjectId
infoList.AiInfoList[index].JobId = job.Metadata.Id
createTime := time.Unix(int64(job.Metadata.CreateTime)/1000, 0)
infoList.AiInfoList[index].CreateTime = time.Time.String(createTime)
if job.Status.StartTime != 0 {
startTime := time.Unix(int64(job.Status.StartTime)/1000, 0)
infoList.AiInfoList[index].StartTime = time.Time.String(startTime)
}
infoList.AiInfoList[index].RunningTime = int64(job.Status.Duration) / 1000
if job.Status.Phase == "Running" {
infoList.AiInfoList[index].Status = "Running"
}
if job.Status.Phase == "Completed" {
infoList.AiInfoList[index].Status = "Completed"
}
if job.Status.Phase == "Creating" {
infoList.AiInfoList[index].Status = "Creating"
}
if job.Status.Phase == "Pending" {
infoList.AiInfoList[index].Status = "Pending"
}
if job.Status.Phase == "Terminated" {
infoList.AiInfoList[index].Status = "Terminated"
}
if job.Status.Phase == "Terminating" {
infoList.AiInfoList[index].Status = "Terminating"
}
if job.Status.Phase == "Abnormal" {
infoList.AiInfoList[index].Status = "Abnormal"
}
}
}
}
// 同步信息到core端
if len(infoList.AiInfoList) != 0 {
syncInfoReq.AiInfoList = infoList.AiInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq)
}
})
}
func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *CreateTrainingJobLogic) {
for index, _ := range infoList.AiInfoList {
if infoList.AiInfoList[index].Status == "Saved" {
submitReq := modelarts.CreateTrainingJobReq{
Kind: "job",
ProjectId: "0a62ffb0d48026c12fbfc011b8d23f0b",
Metadata: &modelarts.MetadataS{
Name: infoList.AiInfoList[index].Name,
WorkspaceId: "0",
Description: "This is a ModelArts Demo Job",
},
Algorithm: &modelarts.Algorithms{
Command: "echo hello;sleep 100;echo hello;sleep 100;echo hello",
Engine: &modelarts.EngineCreateTraining{
ImageUrl: "jcce/nginx:v1",
},
},
Spec: &modelarts.SpecsC{
Resource: &modelarts.ResourceCreateTraining{
FlavorId: "modelarts.p3.large.public.free",
NodeCount: 1,
},
},
}
jobResult, _ := submitJobLogic.CreateTrainingJob(&submitReq)
if jobResult.Code == 200 {
infoList.AiInfoList[index].Status = jobResult.Status.Phase
infoList.AiInfoList[index].ProjectId = jobResult.Metadata.Id
} else {
infoList.AiInfoList[index].Result = "Failed"
infoList.AiInfoList[index].Result = jobResult.Msg
}
}
}
}
func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) {
infoReq := pcmcoreclient.InfoListReq{
Kind: "ai",
ServiceName: "modelArts",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
return nil, err
}
return infoList, nil
}

View File

@ -6,10 +6,8 @@ import (
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/internal/server"
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/internal/svc"
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/modelarts"
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
commonConfig "PCM/common/config"
"PCM/common/interceptor/rpcserver"
"context"
"flag"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
@ -17,36 +15,12 @@ import (
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"time"
)
var configFile = flag.String("f", "adaptor/PCM-AI/PCM-MODELARTS/rpc/etc/pcmmodelarts.yaml", "the config file")
func main() {
//flag.Parse()
//
//var c config.Config
//conf.MustLoad(*configFile, &c)
//// start log component
//logx.MustSetup(c.LogConf)
//ctx := svc.NewServiceContext(c)
//ctx.Cron.Start()
//s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
// modelarts.RegisterModelArtsServer(grpcServer, server.NewModelArtsServer(ctx))
//
// if c.Mode == service.DevMode || c.Mode == service.TestMode {
// reflection.Register(grpcServer)
// }
//})
//
////rpc log
//s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
//defer s.Stop()
//logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
//initCron(ctx)
//s.Start()
//--------------
flag.Parse()
var bootstrapConfig commonConfig.BootstrapConfig
@ -88,126 +62,6 @@ func main() {
defer s.Stop()
logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
initCron(ctx)
logic.InitCron(ctx)
s.Start()
}
func initCron(svc *svc.ServiceContext) {
submitJobLogic := logic.NewCreateTrainingJobLogic(context.Background(), svc)
listLogic := logic.NewGetListTrainingJobsLogic(context.Background(), svc)
svc.Cron.AddFunc("*/5 * * * * ?", func() {
syncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "ai",
ServiceName: "modelArts",
}
// 查询core端分发下来的任务列表
infoList, err := queryCoreInfoList(svc)
if err != nil {
logx.Error(err)
return
}
// 提交任务
submitJob(infoList, submitJobLogic)
// 查询运行中的任务列表同步信息
listReq := modelarts.ListTrainingJobsreq{
ProjectId: "0a62ffb0d48026c12fbfc011b8d23f0b",
Limit: 10,
OffSet: 0,
}
listJob, err := listLogic.GetListTrainingJobs(&listReq)
if err != nil {
logx.Error(err)
return
}
for index, _ := range infoList.AiInfoList {
for _, job := range listJob.Items {
if job.Metadata.Name == infoList.AiInfoList[index].Name {
infoList.AiInfoList[index].ProjectId = job.ProjectId
infoList.AiInfoList[index].JobId = job.Metadata.Id
createTime := time.Unix(int64(job.Metadata.CreateTime)/1000, 0)
infoList.AiInfoList[index].CreateTime = time.Time.String(createTime)
if job.Status.StartTime != 0 {
startTime := time.Unix(int64(job.Status.StartTime)/1000, 0)
infoList.AiInfoList[index].StartTime = time.Time.String(startTime)
}
infoList.AiInfoList[index].RunningTime = int64(job.Status.Duration) / 1000
if job.Status.Phase == "Running" {
infoList.AiInfoList[index].Status = "Running"
}
if job.Status.Phase == "Completed" {
infoList.AiInfoList[index].Status = "Completed"
}
if job.Status.Phase == "Creating" {
infoList.AiInfoList[index].Status = "Creating"
}
if job.Status.Phase == "Pending" {
infoList.AiInfoList[index].Status = "Pending"
}
if job.Status.Phase == "Terminated" {
infoList.AiInfoList[index].Status = "Terminated"
}
if job.Status.Phase == "Terminating" {
infoList.AiInfoList[index].Status = "Terminating"
}
if job.Status.Phase == "Abnormal" {
infoList.AiInfoList[index].Status = "Abnormal"
}
}
}
}
// 同步信息到core端
if len(infoList.AiInfoList) != 0 {
syncInfoReq.AiInfoList = infoList.AiInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq)
}
})
}
func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *logic.CreateTrainingJobLogic) {
for index, _ := range infoList.AiInfoList {
if infoList.AiInfoList[index].Status == "Saved" {
submitReq := modelarts.CreateTrainingJobReq{
Kind: "job",
ProjectId: "0a62ffb0d48026c12fbfc011b8d23f0b",
Metadata: &modelarts.MetadataS{
Name: infoList.AiInfoList[index].Name,
WorkspaceId: "0",
Description: "This is a ModelArts Demo Job",
},
Algorithm: &modelarts.Algorithms{
Command: "echo hello;sleep 100;echo hello;sleep 100;echo hello",
Engine: &modelarts.EngineCreateTraining{
ImageUrl: "jcce/nginx:v1",
},
},
Spec: &modelarts.SpecsC{
Resource: &modelarts.ResourceCreateTraining{
FlavorId: "modelarts.p3.large.public.free",
NodeCount: 1,
},
},
}
jobResult, _ := submitJobLogic.CreateTrainingJob(&submitReq)
if jobResult.Code == 200 {
infoList.AiInfoList[index].Status = jobResult.Status.Phase
infoList.AiInfoList[index].ProjectId = jobResult.Metadata.Id
} else {
infoList.AiInfoList[index].Result = "Failed"
infoList.AiInfoList[index].Result = jobResult.Msg
}
}
}
}
func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) {
infoReq := pcmcoreclient.InfoListReq{
Kind: "ai",
ServiceName: "modelArts",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
return nil, err
}
return infoList, nil
}

View File

@ -1,19 +1,16 @@
Name: octopus.rpc
ListenOn: 0.0.0.0:2009
#Hosts:
# - 127.0.0.1:2379
#Key: octopus.rpc
#User: root
#Pass:
Username: "c2net2@pcl.ac.cn"
Password: "c2net123"
HanwujiUrl: "http://192.168.242.41:8001/"
SuiyuanUrl: "http://192.168.242.41:8000/"
SailingsiUrl: "http://192.168.242.41:8002/"
OctopusResouceSpec: "openaiserver/v1/resourcemanage/resourcespec?resourcePool=common-pool"
OctopusTokenUrl: "openaiserver/v1/authmanage/token"
CambriconMLU290: 256 #TOpsAtFp16
EnflameT20: 128 #TOpsAtFp16
NacosConfig:
DataId: pcm-octopus-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 10.101.15.7
Port: 8848
# - IpAddr: nacos-headless
# Port: 8848
ClientConfig:
NamespaceId: test_octopus
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir:
CacheDir:
LogLevel: debug

View File

@ -28,14 +28,14 @@ func generateTokenMap() map[string]TokenTimePair {
var tokenMap = make(map[string]TokenTimePair)
octopusConfig := config.Cfg
login := Login{
Username: octopusConfig.OctopusConfig.Username,
Password: octopusConfig.OctopusConfig.Password,
Username: octopusConfig.Username,
Password: octopusConfig.Password,
}
jsonStr, _ := json.Marshal(login)
urlMap := map[string]string{
Hanwuji: octopusConfig.OctopusConfig.HanwujiUrl + octopusConfig.OctopusTokenUrl,
Suiyuan: octopusConfig.OctopusConfig.SuiyuanUrl + octopusConfig.OctopusTokenUrl,
Sailingsi: octopusConfig.OctopusConfig.SailingsiUrl + octopusConfig.OctopusTokenUrl,
Hanwuji: octopusConfig.HanwujiUrl + octopusConfig.OctopusTokenUrl,
Suiyuan: octopusConfig.SuiyuanUrl + octopusConfig.OctopusTokenUrl,
Sailingsi: octopusConfig.SailingsiUrl + octopusConfig.OctopusTokenUrl,
}
for k, v := range urlMap {
token, expiredAt := generateToken(jsonStr, v)

View File

@ -1,21 +1,45 @@
package config
import (
commonConfig "PCM/common/config"
"flag"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc"
)
type Config struct {
zrpc.RpcServerConf
OctopusConfig
LogConf logx.LogConf
}
var configFile = flag.String("o", "adaptor/PCM-AI/PCM-OCTOPUS/rpc/etc/octopus.yaml", "the config file")
var configFile = flag.String("f", "adaptor/PCM-AI/PCM-OCTOPUS/rpc/etc/octopus.yaml", "the config file")
var Cfg = getConfig()
func getConfig() Config {
var bootstrapConfig commonConfig.BootstrapConfig
conf.MustLoad(*configFile, &bootstrapConfig)
//解析业务配置
var c Config
conf.MustLoad(*configFile, &c)
nacosConfig := bootstrapConfig.NacosConfig
serviceConfigContent := nacosConfig.InitConfig(func(data string) {
err := conf.LoadFromYamlBytes([]byte(data), &c)
if err != nil {
panic(err)
}
})
err := conf.LoadFromYamlBytes([]byte(serviceConfigContent), &c)
if err != nil {
panic(err)
}
// start log component
logx.MustSetup(c.LogConf)
// 注册到nacos
nacosConfig.Discovery(&c.RpcServerConf)
return c
}

View File

@ -32,9 +32,9 @@ func (l *GetComputingPowerLogic) GetComputingPower(in *octopus.ResourceReq) (*oc
octopusConfig := config.Cfg
urlMap := map[string]string{
common.Hanwuji: octopusConfig.OctopusConfig.HanwujiUrl + octopusConfig.OctopusResouceSpec,
common.Suiyuan: octopusConfig.OctopusConfig.SuiyuanUrl + octopusConfig.OctopusResouceSpec,
common.Sailingsi: octopusConfig.OctopusConfig.SailingsiUrl + octopusConfig.OctopusResouceSpec,
common.Hanwuji: octopusConfig.HanwujiUrl + octopusConfig.OctopusResouceSpec,
common.Suiyuan: octopusConfig.SuiyuanUrl + octopusConfig.OctopusResouceSpec,
common.Sailingsi: octopusConfig.SailingsiUrl + octopusConfig.OctopusResouceSpec,
}
var computingPowerInTops int32

View File

@ -3,27 +3,23 @@ package main
import (
"PCM/common/interceptor/rpcserver"
"flag"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"PCM/adaptor/PCM-AI/PCM-OCTOPUS/rpc/internal/config"
"PCM/adaptor/PCM-AI/PCM-OCTOPUS/rpc/internal/server"
"PCM/adaptor/PCM-AI/PCM-OCTOPUS/rpc/internal/svc"
"PCM/adaptor/PCM-AI/PCM-OCTOPUS/rpc/octopus"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var configFile = flag.String("f", "adaptor/PCM-AI/PCM-OCTOPUS/rpc/etc/octopus.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
c := config.Cfg
ctx := svc.NewServiceContext(c)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
@ -38,8 +34,7 @@ func main() {
s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
s.Start()
}

View File

@ -259,3 +259,9 @@ type (
Ydyl bool `json:"ydyl"`
}
)
type (
cpResp {
POpsAtFp16 float32 `json:"pOpsAtFp16"`
}
)

View File

@ -40,6 +40,9 @@ service pcm {
// @handler syncInfoHandler
// get /core/syncInfo (syncInfoReq) returns (syncInfoResp)
@handler getComputingPowerHandler
get /core/getComputingPower returns (cpResp)
}
//hpc二级接口

View File

@ -2,12 +2,14 @@ NacosConfig:
DataId: pcm-core-api.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: 10.101.15.7
Port: 8848
- IpAddr: nacos-headless
Port: 8848
ClientConfig:
NamespaceId: test
NamespaceId: test_octopus
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir:

View File

@ -0,0 +1,21 @@
package core
import (
"net/http"
"PCM/adaptor/PCM-CORE/api/internal/logic/core"
"PCM/adaptor/PCM-CORE/api/internal/svc"
"github.com/zeromicro/go-zero/rest/httpx"
)
func GetComputingPowerHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
l := core.NewGetComputingPowerLogic(r.Context(), svcCtx)
resp, err := l.GetComputingPower()
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -45,6 +45,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/listRegion",
Handler: core.ListRegionHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/getComputingPower",
Handler: core.GetComputingPowerHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)

View File

@ -0,0 +1,35 @@
package core
import (
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
"context"
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetComputingPowerLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetComputingPowerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetComputingPowerLogic {
return &GetComputingPowerLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetComputingPowerLogic) GetComputingPower() (resp *types.CpResp, err error) {
apiResp := types.CpResp{}
req := &hpcAC.ResourceReq{}
cpResp, err := l.svcCtx.ACRpc.GetComputingPower(l.ctx, req)
apiResp.POpsAtFp16 = cpResp.POpsAtFp16
return &apiResp, nil
}

View File

@ -29,7 +29,7 @@ func (l *ListCenterLogic) ListCenter() (*types.ListCenterResp, error) {
//var centersModel []model.ComputeCenter
var resp types.ListCenterResp
centersModel, _ = l.svcCtx.CenterOverviewModel.FindAll(l.ctx)
l.svcCtx.DbEngin.Raw("select cc.*, ac.cluster_num, ac.node_num, ac.cpu_num, ac.gpu_num, ac.managed_flops, ac.unmanaged_flops, ac.managed_storage, ac.unmanaged_storage, hc.cluster_num, c.node_num, hc.cpu_num, hc.gpu_num, hc.managed_flops, hc.unmanaged_flops, hc.managed_storage, hc.unmanaged_storage, c.cluster_num, c.node_num, c.cpu_num, c.gpu_num, c.managed_flops, c.unmanaged_flops, c.managed_storage, c.unmanaged_storage, ct.edwc, ct.ydyl\nfrom compute_center cc\nleft join ai_center ac on cc.id = ac.id\nleft join hpc_center hc on cc.id = hc.id\nleft join cloud_center c on cc.id = c.id\nleft join center_tag ct on cc.id = ct.id").Scan(&centersModel)
var centerModelV = *centersModel

View File

@ -1,14 +1,10 @@
package core
import (
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/modelartsclient"
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/api/internal/types"
"PCM/adaptor/PCM-CORE/model"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcacclient"
"PCM/common/tool"
"context"
"database/sql"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
appv1 "k8s.io/api/apps/v1"
@ -58,12 +54,12 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (resp *type
}
// save the task in mysql and return id
result, err := l.svcCtx.TaskModel.Insert(l.ctx, &task)
if err != nil {
return nil, err
tx := l.svcCtx.DbEngin.Create(&task)
if tx.Error != nil {
return nil, tx.Error
}
id, _ := result.LastInsertId()
req.TaskId = id
req.TaskId = task.Id
reqMessage, err := json.Marshal(req)
if err != nil {
return nil, err
@ -81,7 +77,7 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (resp *type
}
func (l *ScheduleTaskLogic) checkSubmitReq(req *types.ScheduleTaskReq) string {
var rows *sql.Rows
//var rows *sql.Rows
switch req.ServiceName {
case "kubeNative":
//bytes, err := json.Marshal(req.Metadata)
@ -100,20 +96,20 @@ func (l *ScheduleTaskLogic) checkSubmitReq(req *types.ScheduleTaskReq) string {
// }
// }
//}
case "modelArts":
var modelArtsReq modelartsclient.CreateTrainingJobReq
tool.Convert(req.Metadata, &modelArtsReq)
rows, _ = l.svcCtx.Db.Query("select id from ai where project_id = ? and name = ?", modelArtsReq.ProjectId, modelArtsReq.Metadata.Name)
if rows != nil && rows.Next() {
return "data already exists."
}
case "ac":
var acReq *hpcacclient.SubmitJobReq
tool.Convert(req.Metadata, &acReq)
rows, _ = l.svcCtx.Db.Query("select id from hpc where name = ?", acReq.Appname)
if rows != nil && rows.Next() {
return "data already exists."
}
//case "modelArts":
// var modelArtsReq modelartsclient.CreateTrainingJobReq
// tool.Convert(req.Metadata, &modelArtsReq)
// rows, _ = l.svcCtx.Db.Query("select id from ai where project_id = ? and name = ?", modelArtsReq.ProjectId, modelArtsReq.Metadata.Name)
// if rows != nil && rows.Next() {
// return "data already exists."
// }
//case "ac":
// var acReq *hpcacclient.SubmitJobReq
// tool.Convert(req.Metadata, &acReq)
// rows, _ = l.svcCtx.Db.Query("select id from hpc where name = ?", acReq.Appname)
// if rows != nil && rows.Next() {
// return "data already exists."
// }
}
return ""

View File

@ -27,45 +27,18 @@ func NewTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskList
func (l *TaskListLogic) TaskList() (resp *types.TaskListResp, err error) {
resp = &types.TaskListResp{}
// 查询总运行时长
runningTimeRows, err := l.svcCtx.Db.Query("select sum(running_time)/3600 as running_time from (select sum(running_time) as running_time from hpc union all select sum(running_time) as running_time from cloud union all select sum(running_time) as running_time from ai) runtime")
if err != nil {
logx.Error(err)
return nil, err
}
for runningTimeRows.Next() {
runningTimeRows.Scan(&resp.Data.TotalRunTime)
tx := l.svcCtx.DbEngin.Raw("select sum(running_time)/3600 as total_run_time from (select sum(running_time) as running_time from hpc union all select sum(running_time) as running_time from cloud union all select sum(running_time) as running_time from ai) runtime").Scan(&resp.Data.TotalRunTime)
if tx.Error != nil {
}
// 查询任务数据
var tasks []model.Task
err = l.svcCtx.SqlConn.QueryRows(&tasks, "select * from task where deleted_flag = 0")
if err != nil {
tx = l.svcCtx.DbEngin.Find(&tasks)
if tx.Error != nil {
logx.Error(err)
return nil, err
}
// 查询超算数据
var hpcInfos []model.Hpc
err = l.svcCtx.SqlConn.QueryRows(&hpcInfos, "select * from hpc where deleted_flag = 0")
if err != nil {
logx.Error(err)
return nil, err
}
// 查询智算数据
var aiInfos []model.Ai
err = l.svcCtx.SqlConn.QueryRows(&aiInfos, "select * from ai where deleted_flag = 0")
if err != nil {
logx.Error(err)
return nil, err
}
// 查询云算数据
var cloudInfos []model.Cloud
err = l.svcCtx.SqlConn.QueryRows(&cloudInfos, "select * from cloud where deleted_flag = 0")
if err != nil {
logx.Error(err)
return nil, err
return nil, tx.Error
}
for _, task := range tasks {
resp.Data.Tasks = append(resp.Data.Tasks, types.Task{
ServiceName: task.ServiceName,
Name: task.Name,
@ -75,13 +48,11 @@ func (l *TaskListLogic) TaskList() (resp *types.TaskListResp, err error) {
})
}
// 运行卡时数
rows, err := l.svcCtx.Db.Query("select SUM(running_time * card_count)/3600 from hpc where deleted_flag = 0")
if err != nil {
return nil, err
}
for rows.Next() {
rows.Scan(&resp.Data.CardTime)
tx = l.svcCtx.DbEngin.Model(&model.Hpc{}).Select("(CASE WHEN SUM(running_time * card_count)/3600 IS NULL THEN 0 ELSE SUM(running_time * card_count)/3600 END )as cardTime").Find(&resp.Data.CardTime)
if tx.Error != nil {
return nil, tx.Error
}
// 运行任务合计数
resp.Data.TotalCount = len(tasks)
resp.Code = 200

View File

@ -3,7 +3,7 @@ package kq
import (
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/api/internal/types"
"PCM/common/param"
"PCM/adaptor/PCM-CORE/model"
"PCM/common/tool"
"context"
"encoding/json"
@ -30,13 +30,13 @@ func (l *ScheduleAiMq) Consume(_, val string) error {
// 接受消息
var req *types.ScheduleTaskReq
json.Unmarshal([]byte(val), &req)
var aiBaseList []param.AiBase
tool.Convert(req.Metadata, &aiBaseList)
for index, _ := range aiBaseList {
aiBaseList[index].TaskId = req.TaskId
aiBaseList[index].Status = "Saved"
var aiList []model.Ai
tool.Convert(req.Metadata, &aiList)
for index, _ := range aiList {
aiList[index].TaskId = req.TaskId
aiList[index].Status = "Saved"
// 解析超算信息以yaml形式存储到数据库中
jsonBytes, err := json.Marshal(aiBaseList[index])
jsonBytes, err := json.Marshal(aiList[index])
if err != nil {
return err
}
@ -44,12 +44,12 @@ func (l *ScheduleAiMq) Consume(_, val string) error {
if err != nil {
return err
}
aiBaseList[index].YamlString = string(bytes)
aiList[index].YamlString = string(bytes)
}
// 存储数据
_, err := l.svcCtx.Db.NamedExec("insert into ai (task_id,service_name,name,status,project_id) values (:task_id,:service_name,:name,:status,:project_id)", aiBaseList)
if err != nil {
return err
tx := l.svcCtx.DbEngin.Create(aiList)
if tx.Error != nil {
return tx.Error
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"PCM/common/tool"
"context"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
)
/*
@ -26,17 +27,11 @@ func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedu
}
func (l *ScheduleCloudMq) Consume(_, val string) error {
var req *types.ScheduleTaskReq
json.Unmarshal([]byte(val), &req)
// 构建提交作业到云算的结构体
var yamlArray []interface{}
bytes, err := json.Marshal(req.Metadata)
json.Unmarshal(bytes, &yamlArray)
if err != nil {
return err
}
tool.Convert(req.Metadata, &yamlArray)
var clouds []model.Cloud
for _, yaml := range yamlArray {
bytes, err := json.Marshal(yaml)
@ -49,9 +44,10 @@ func (l *ScheduleCloudMq) Consume(_, val string) error {
}
// 存储数据
_, err = l.svcCtx.Db.NamedExec("insert into cloud (task_id,kind,namespace,name,api_version,status,service_name,yaml_string) values (:task_id,:kind,:namespace,:name,:api_version,:status,:service_name,:yaml_string)", clouds)
if err != nil {
return err
tx := l.svcCtx.DbEngin.Create(&clouds)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
return nil

View File

@ -3,7 +3,7 @@ package kq
import (
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/api/internal/types"
"PCM/common/param"
"PCM/adaptor/PCM-CORE/model"
"PCM/common/tool"
"context"
"encoding/json"
@ -30,13 +30,13 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
// 接受消息
var req *types.ScheduleTaskReq
json.Unmarshal([]byte(val), &req)
var hpcBaseList []param.HpcBase
tool.Convert(req.Metadata, &hpcBaseList)
for index, _ := range hpcBaseList {
hpcBaseList[index].TaskId = req.TaskId
hpcBaseList[index].Status = "Saved"
var hpcList []model.Hpc
tool.Convert(req.Metadata, &hpcList)
for index, _ := range hpcList {
hpcList[index].TaskId = req.TaskId
hpcList[index].Status = "Saved"
// 解析超算信息以yaml形式存储到数据库中
jsonBytes, err := json.Marshal(hpcBaseList[index])
jsonBytes, err := json.Marshal(hpcList[index])
if err != nil {
return err
}
@ -44,12 +44,12 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
if err != nil {
return err
}
hpcBaseList[index].YamlString = string(bytes)
hpcList[index].YamlString = string(bytes)
}
// 存储数据
_, err := l.svcCtx.Db.NamedExec("insert into hpc (task_id,service_name,card_count,name,work_dir,wall_time,status) values (:task_id,:service_name,:card_count,:name,:work_dir,:wall_time,:status)", hpcBaseList)
if err != nil {
return err
tx := l.svcCtx.DbEngin.Create(hpcList)
if tx.Error != nil {
return tx.Error
}
return nil
}

View File

@ -3,14 +3,11 @@ package svc
import (
"PCM/adaptor/PCM-AI/PCM-MODELARTS/rpc/modelartsclient"
"PCM/adaptor/PCM-CORE/api/internal/config"
"PCM/adaptor/PCM-CORE/model"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcacclient"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcthclient"
"github.com/go-redis/redis/v8"
sql "github.com/jmoiron/sqlx"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/zrpc"
"gorm.io/driver/mysql"
"gorm.io/gorm"
@ -19,14 +16,10 @@ import (
type ServiceContext struct {
Config config.Config
ScheduleHpcClient *kq.Pusher
RedisClient *redis.Client
ScheduleHpcClient *kq.Pusher
ScheduleCloudClient *kq.Pusher
ScheduleAiClient *kq.Pusher
TaskModel model.TaskModel
CenterOverviewModel model.CenterOverviewModel
SqlConn sqlx.SqlConn
Db *sql.DB
Cron *cron.Cron
ModelArtsRpc modelartsclient.ModelArts
DbEngin *gorm.DB
@ -35,8 +28,6 @@ type ServiceContext struct {
}
func NewServiceContext(c config.Config) *ServiceContext {
sqlConn := sqlx.NewMysql(c.DB.DataSource)
db, _ := sql.Open("mysql", c.DB.DataSource)
//启动Gorm支持
dbEngin, _ := gorm.Open(mysql.Open(c.DB.DataSource), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
@ -47,14 +38,10 @@ func NewServiceContext(c config.Config) *ServiceContext {
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
Db: db,
RedisClient: redis.NewClient(&redis.Options{
Addr: c.Redis.Host,
Password: c.Redis.Pass,
}),
SqlConn: sqlx.NewMysql(c.DB.DataSource),
TaskModel: model.NewTaskModel(sqlConn, c.Cache),
CenterOverviewModel: model.NewCenterOverviewModel(sqlConn, c.Cache),
ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic),
ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic),
ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic),

View File

@ -241,6 +241,10 @@ type Center struct {
Ydyl bool `json:"ydyl"`
}
type CpResp struct {
POpsAtFp16 float32 `json:"pOpsAtFp16"`
}
type Job struct {
SlurmVersion string `json:"slurmVersion"`
Account string `json:"account"`

View File

@ -5,58 +5,19 @@ import (
"PCM/adaptor/PCM-CORE/api/internal/handler"
kqMq "PCM/adaptor/PCM-CORE/api/internal/mqs/kq"
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/model"
commonConfig "PCM/common/config"
"context"
"flag"
"github.com/go-redis/redis/v8"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/rest"
)
var configFile = flag.String("f", "adaptor/PCM-CORE/api/etc/pcm.yaml", "the config file")
func main() {
//flag.Parse()
//
//var c config.Config
//conf.MustLoad(*configFile, &c)
//
//serviceGroup := service.NewServiceGroup()
//defer serviceGroup.Stop()
//
//server := rest.MustNewServer(c.RestConf)
//
//ctx := svc.NewServiceContext(c)
//// start log component
//logx.MustSetup(c.LogConf)
//ctx.Cron.Start()
//handler.RegisterHandlers(server, ctx)
//
//serviceGroup.Add(server)
//services := []service.Service{
// //Listening for changes in consumption flow status
// kq.MustNewQueue(c.HpcConsumerConf, kqMq.NewScheduleHpcMq(context.Background(), ctx)),
// kq.MustNewQueue(c.CloudConsumerConf, kqMq.NewScheduleCloudMq(context.Background(), ctx)),
// kq.MustNewQueue(c.AiConsumerConf, kqMq.NewScheduleAiMq(context.Background(), ctx)),
//
// //.....
//}
//for _, mq := range services {
// serviceGroup.Add(mq)
//}
//
//// 初始化数据到缓存
//initRedisData(ctx.SqlConn, ctx.RedisClient)
//logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port)
//serviceGroup.Start()
//----------------------
flag.Parse()
var bootstrapConfig commonConfig.BootstrapConfig
@ -104,23 +65,7 @@ func main() {
serviceGroup.Add(mq)
}
// 初始化数据到缓存
initRedisData(ctx.SqlConn, ctx.RedisClient)
logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port)
serviceGroup.Start()
}
func initRedisData(sql sqlx.SqlConn, redisClient *redis.Client) {
// 查询出字典数据列表
var dictList []model.Dict
err := sql.QueryRows(&dictList, "select * from dict")
if err != nil {
return
}
for _, dict := range dictList {
redisClient.Set(context.Background(), dict.DictValue, dict.DictCode, 0)
}
}

View File

@ -7,7 +7,6 @@ import (
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlc"
@ -37,22 +36,22 @@ type (
}
Ai struct {
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ProjectId string `db:"project_id"` // 项目id
Name string `db:"name"` // 名称
Status string `db:"status"` // 状态
StartTime string `db:"start_time"` // 开始时间
RunningTime string `db:"running_time"` // 运行时间
CreatedBy string `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy string `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
ServiceName string `db:"service_name"`
Result sql.NullString `db:"result"`
YamlString string `db:"yaml_string"`
JobId string `db:"job_id"`
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ProjectId string `db:"project_id"` // 项目id
Name string `db:"name"` // 名称
Status string `db:"status"` // 状态
StartTime string `db:"start_time"` // 开始时间
RunningTime string `db:"running_time"` // 运行时间
CreatedBy string `db:"created_by"` // 创建人
CreatedTime sql.NullTime `db:"created_time"` // 创建时间
UpdatedBy string `db:"updated_by"` // 更新人
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
ServiceName string `db:"service_name"`
Result string `db:"result"`
YamlString string `db:"yaml_string"`
JobId string `db:"job_id"`
}
)

View File

@ -7,7 +7,6 @@ import (
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlc"
@ -37,23 +36,23 @@ type (
}
Cloud struct {
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ApiVersion string `db:"api_version"`
Name string `db:"name"` // 名称
Namespace string `db:"namespace"` // 命名空间
Kind string `db:"kind"` // 种类
Status string `db:"status"` // 状态
StartTime string `db:"start_time"` // 开始时间
RunningTime int64 `db:"running_time"` // 运行时长
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
ServiceName string `db:"service_name"`
YamlString string `db:"yaml_string"`
Result sql.NullString `db:"result"`
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ApiVersion string `db:"api_version"`
Name string `db:"name"` // 名称
Namespace string `db:"namespace"` // 命名空间
Kind string `db:"kind"` // 种类
Status string `db:"status"` // 状态
StartTime string `db:"start_time"` // 开始时间
RunningTime int64 `db:"running_time"` // 运行时长
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime sql.NullTime `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
ServiceName string `db:"service_name"`
YamlString string `db:"yaml_string"`
Result string `db:"result"`
}
)

View File

@ -7,7 +7,6 @@ import (
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlc"
@ -37,24 +36,24 @@ type (
}
Hpc struct {
Id int64 `db:"id"` // id
TaskId sql.NullInt64 `db:"task_id"` // 任务id
JobId sql.NullString `db:"job_id"` // 作业id
ServiceName sql.NullString `db:"service_name"` // 服务名称
Name sql.NullString `db:"name"` // 名称
Status sql.NullString `db:"status"` // 状态
StartTime sql.NullString `db:"start_time"` // 开始时间
RunningTime sql.NullInt64 `db:"running_time"` // 运行时间
CardCount sql.NullInt64 `db:"card_count"` // 卡数
CreatedBy sql.NullInt64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy sql.NullInt64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
WorkDir sql.NullString `db:"work_dir"`
WallTime sql.NullString `db:"wall_time"`
Result sql.NullString `db:"result"`
YamlString sql.NullString `db:"yaml_string"`
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
JobId string `db:"job_id"` // 作业id
ServiceName string `db:"service_name"` // 服务名称
Name string `db:"name"` // 名称
Status string `db:"status"` // 状态
StartTime string `db:"start_time"` // 开始时间
RunningTime int64 `db:"running_time"` // 运行时间
CardCount int64 `db:"card_count"` // 卡数
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime sql.NullTime `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
WorkDir string `db:"work_dir"`
WallTime string `db:"wall_time"`
Result string `db:"result"`
YamlString string `db:"yaml_string"`
}
)

View File

@ -2,7 +2,9 @@ NacosConfig:
DataId: pcm-core-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: 10.101.15.7
Port: 8848
- IpAddr: nacos-headless
Port: 8848

View File

@ -4,6 +4,7 @@ import (
"PCM/adaptor/PCM-CORE/model"
"PCM/adaptor/PCM-CORE/rpc/internal/svc"
"PCM/adaptor/PCM-CORE/rpc/pcmCore"
"PCM/common/tool"
"context"
"github.com/zeromicro/go-zero/core/logx"
@ -29,59 +30,32 @@ func (l *InfoListLogic) InfoList(in *pcmCore.InfoListReq) (*pcmCore.InfoListResp
// 查询云智超中的数据列表
switch in.Kind {
case "hpc":
rows, err := l.svcCtx.Db.Query("select task_id,name,status,work_dir,wall_time from hpc where service_name = ? and status not in ('Succeed', 'Completed')", in.ServiceName)
if err != nil {
return nil, err
var hpcModelList []model.Hpc
tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ServiceName, []string{"Succeed", "Completed"}).Find(&hpcModelList)
if tx.Error != nil {
return nil, tx.Error
}
for rows.Next() {
var hpc model.Hpc
rows.Scan(&hpc.TaskId, &hpc.Name, &hpc.Status, &hpc.WorkDir, &hpc.WallTime)
hpcInfo := pcmCore.HpcInfo{
TaskId: hpc.TaskId.Int64,
Name: hpc.Name.String,
Status: hpc.Status.String,
WorkDir: hpc.WorkDir.String,
WallTime: hpc.WallTime.String,
}
result.HpcInfoList = append(result.HpcInfoList, &hpcInfo)
}
var hpcInfoList []*pcmCore.HpcInfo
tool.Convert(hpcModelList, &hpcInfoList)
result.HpcInfoList = hpcInfoList
case "cloud":
rows, err := l.svcCtx.Db.Query("select task_id,namespace,name,status,yaml_string from cloud where service_name = ? and status not in ('Succeed', 'Completed')", in.ServiceName)
if err != nil {
return nil, err
}
for rows.Next() {
var cloud model.Cloud
rows.Scan(&cloud.TaskId, &cloud.Namespace, &cloud.Name, &cloud.Status, &cloud.YamlString)
var cloudInfo pcmCore.CloudInfo
cloudInfo = pcmCore.CloudInfo{
TaskId: cloud.TaskId,
Namespace: cloud.Namespace,
Name: cloud.Name,
Status: cloud.Status,
YamlString: cloud.YamlString,
}
result.CloudInfoList = append(result.CloudInfoList, &cloudInfo)
var cloudModelList []model.Cloud
tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ServiceName, []string{"Succeed", "Completed"}).Find(&cloudModelList)
if tx.Error != nil {
return nil, tx.Error
}
var cloudInfoList []*pcmCore.CloudInfo
tool.Convert(cloudModelList, &cloudInfoList)
result.CloudInfoList = cloudInfoList
case "ai":
rows, err := l.svcCtx.Db.Query("select task_id,name,status,project_id,job_id from ai where service_name = ? and status not in ('Succeed', 'Completed')", in.ServiceName)
if err != nil {
return nil, err
}
for rows.Next() {
var ai model.Ai
rows.Scan(&ai.TaskId, &ai.Name, &ai.Status, &ai.ProjectId, &ai.JobId)
var aiInfo pcmCore.AiInfo
aiInfo = pcmCore.AiInfo{
TaskId: ai.TaskId,
ProjectId: ai.ProjectId,
Name: ai.Name,
Status: ai.Status,
JobId: ai.JobId,
}
result.AiInfoList = append(result.AiInfoList, &aiInfo)
var aiModelList []model.AiModel
tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ServiceName, []string{"Succeed", "Completed"}).Find(&aiModelList)
if tx.Error != nil {
return nil, tx.Error
}
var aiInfoList []*pcmCore.AiInfo
tool.Convert(aiModelList, &aiInfoList)
result.AiInfoList = aiInfoList
}
return &result, nil
}

View File

@ -33,24 +33,24 @@ func NewSyncInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncInfo
// SyncInfo Synchronous data information
func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp, error) {
conn, err := l.svcCtx.Db.Begin()
if err != nil {
return nil, err
db := l.svcCtx.DbEngin.Begin()
if db.Error != nil {
return nil, db.Error
}
switch in.Kind {
case "cloud":
for _, cloudInfo := range in.CloudInfoList {
_, err = conn.Exec("update cloud set status = ?,start_time = ?,running_time = ? where service_name = ? and task_id = ? and namespace = ? and name = ?",
db.Exec("update cloud set status = ?,start_time = ?,running_time = ? where service_name = ? and task_id = ? and namespace = ? and name = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, in.ServiceName, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name)
}
case "hpc":
for _, hpcInfo := range in.HpcInfoList {
_, err = conn.Exec("update hpc set status = ?,start_time = ?,running_time = ?,job_id = ? where service_name = ? and task_id = ? and name = ?",
db.Exec("update hpc set status = ?,start_time = ?,running_time = ?,job_id = ? where service_name = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ServiceName, hpcInfo.TaskId, hpcInfo.Name)
}
case "ai":
for _, aiInfo := range in.AiInfoList {
_, err = conn.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ?,created_time = ? where service_name = ? and task_id = ? and name = ?",
db.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ?,created_time = ? where service_name = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, aiInfo.CreateTime, in.ServiceName, aiInfo.TaskId, aiInfo.Name)
}
}
@ -58,13 +58,13 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp
// 执行回滚或者提交操作
defer func() {
if p := recover(); p != nil {
conn.Rollback()
db.Rollback()
logx.Error(p)
} else if err != nil {
} else if db.Error != nil {
logx.Info("rollback")
conn.Rollback()
db.Rollback()
} else {
err = conn.Commit()
db = db.Commit()
logx.Info("commit success")
}
}()

View File

@ -3,18 +3,27 @@ package svc
import (
"PCM/adaptor/PCM-CORE/rpc/internal/config"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
)
type ServiceContext struct {
Config config.Config
Db *sqlx.DB
Config config.Config
DbEngin *gorm.DB
}
func NewServiceContext(c config.Config) *ServiceContext {
db, _ := sqlx.Open("mysql", c.DB.DataSource)
//启动Gorm支持
dbEngin, _ := gorm.Open(mysql.Open(c.DB.DataSource), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true, // 使用单数表名,启用该选项,此时,`User` 的表名应该是 `t_user`
},
Logger: logger.Default.LogMode(logger.Info),
})
return &ServiceContext{
Config: c,
Db: db,
Config: c,
DbEngin: dbEngin,
}
}

View File

@ -19,29 +19,6 @@ import (
var configFile = flag.String("f", "adaptor/PCM-CORE/rpc/etc/pcmcore.yaml", "the config file")
func main() {
//flag.Parse()
//
//var c config.Config
//conf.MustLoad(*configFile, &c)
//ctx := svc.NewServiceContext(c)
//
//s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
// pcmCore.RegisterPcmCoreServer(grpcServer, server.NewPcmCoreServer(ctx))
//
// if c.Mode == service.DevMode || c.Mode == service.TestMode {
// reflection.Register(grpcServer)
// }
//})
//
////rpc log
//s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
//
//defer s.Stop()
//
//fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
//s.Start()
//-------
flag.Parse()

View File

@ -2,12 +2,14 @@ NacosConfig:
DataId: pcm-ac-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: 10.101.15.7
Port: 8848
- IpAddr: nacos-headless
Port: 8848
ClientConfig:
NamespaceId: test
NamespaceId: test_octopus
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir:

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: hpcAC.proto
// source: pb/hpcAC.proto
package hpcAC
@ -55,6 +55,8 @@ type HpcACClient interface {
GetACToken(ctx context.Context, in *ACTokenReq, opts ...grpc.CallOption) (*TokenResp, error)
// 曙光ac获取clusterid
GetACClusterId(ctx context.Context, in *ACClusterReq, opts ...grpc.CallOption) (*ClusterResp, error)
// 获取曙光账号算力
GetComputingPower(ctx context.Context, in *ResourceReq, opts ...grpc.CallOption) (*CpResp, error)
}
type hpcACClient struct {
@ -227,6 +229,15 @@ func (c *hpcACClient) GetACClusterId(ctx context.Context, in *ACClusterReq, opts
return out, nil
}
func (c *hpcACClient) GetComputingPower(ctx context.Context, in *ResourceReq, opts ...grpc.CallOption) (*CpResp, error) {
out := new(CpResp)
err := c.cc.Invoke(ctx, "/hpcAC.hpcAC/GetComputingPower", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// HpcACServer is the server API for HpcAC service.
// All implementations must embed UnimplementedHpcACServer
// for forward compatibility
@ -264,6 +275,8 @@ type HpcACServer interface {
GetACToken(context.Context, *ACTokenReq) (*TokenResp, error)
// 曙光ac获取clusterid
GetACClusterId(context.Context, *ACClusterReq) (*ClusterResp, error)
// 获取曙光账号算力
GetComputingPower(context.Context, *ResourceReq) (*CpResp, error)
mustEmbedUnimplementedHpcACServer()
}
@ -325,6 +338,9 @@ func (UnimplementedHpcACServer) GetACToken(context.Context, *ACTokenReq) (*Token
func (UnimplementedHpcACServer) GetACClusterId(context.Context, *ACClusterReq) (*ClusterResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetACClusterId not implemented")
}
func (UnimplementedHpcACServer) GetComputingPower(context.Context, *ResourceReq) (*CpResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetComputingPower not implemented")
}
func (UnimplementedHpcACServer) mustEmbedUnimplementedHpcACServer() {}
// UnsafeHpcACServer may be embedded to opt out of forward compatibility for this service.
@ -662,6 +678,24 @@ func _HpcAC_GetACClusterId_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _HpcAC_GetComputingPower_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ResourceReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HpcACServer).GetComputingPower(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hpcAC.hpcAC/GetComputingPower",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HpcACServer).GetComputingPower(ctx, req.(*ResourceReq))
}
return interceptor(ctx, in, info, handler)
}
// HpcAC_ServiceDesc is the grpc.ServiceDesc for HpcAC service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -741,7 +775,11 @@ var HpcAC_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetACClusterId",
Handler: _HpcAC_GetACClusterId_Handler,
},
{
MethodName: "GetComputingPower",
Handler: _HpcAC_GetComputingPower_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "hpcAC.proto",
Metadata: "pb/hpcAC.proto",
}

View File

@ -1,7 +1,6 @@
package main
import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/config"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic"
@ -9,8 +8,6 @@ import (
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc"
commonConfig "PCM/common/config"
"PCM/common/interceptor/rpcserver"
"PCM/common/tool"
"context"
"flag"
"fmt"
"github.com/zeromicro/go-zero/core/conf"
@ -90,99 +87,10 @@ func main() {
s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
defer s.Stop()
// 初始化定时任务
logic.InitCron(ctx)
logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
initCron(ctx)
s.Start()
}
func initCron(svc *svc.ServiceContext) {
submitJobLogic := logic.NewSubmitJobLogic(context.Background(), svc)
listLogic := logic.NewListJobLogic(context.Background(), svc)
svc.Cron.AddFunc("*/5 * * * * ?", func() {
syncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "hpc",
ServiceName: "ac",
}
// 查询core端分发下来的任务列表
infoList, err := queryCoreInfoList(svc)
if err != nil {
logx.Error(err)
return
}
// 提交任务
submitJob(infoList, submitJobLogic)
// 查询运行中的任务列表同步信息
listReq := hpcAC.ListJobReq{}
listJob, err := listLogic.ListJob(&listReq)
if err != nil {
logx.Error(err)
return
}
for index, _ := range infoList.HpcInfoList {
for _, job := range listJob.Jobs {
if job.JobName == infoList.HpcInfoList[index].Name {
infoList.HpcInfoList[index].JobId = job.JobId
infoList.HpcInfoList[index].StartTime = job.JobStartTime
infoList.HpcInfoList[index].RunningTime = int64(tool.RunTimeToSeconds(job.JobRunTime))
if job.JobStatus == "statR" {
infoList.HpcInfoList[index].Status = "Running"
}
if job.JobStatus == "statC" {
infoList.HpcInfoList[index].Status = "Completed"
}
}
}
}
// 同步信息到core端
if len(infoList.HpcInfoList) != 0 {
syncInfoReq.HpcInfoList = infoList.HpcInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq)
}
})
}
func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *logic.SubmitJobLogic) {
for index, _ := range infoList.HpcInfoList {
if infoList.HpcInfoList[index].Status == "Saved" {
submitReq := hpcAC.SubmitJobReq{
Appname: "BASE",
Apptype: "BASIC",
StrJobManagerID: 1638523853,
MapAppJobInfo: &hpcAC.MapAppJobInfo{
GAP_CMD_FILE: "sleep 10",
GAP_NNODE: "1",
GAP_SUBMIT_TYPE: "cmd",
GAP_JOB_NAME: infoList.HpcInfoList[index].Name,
GAP_WORK_DIR: infoList.HpcInfoList[index].WorkDir,
GAP_QUEUE: "debug2",
GAP_NPROC: "1",
GAP_APPNAME: "BASE",
GAP_WALL_TIME: infoList.HpcInfoList[index].WallTime,
GAP_STD_OUT_FILE: "/public/home/zhijiang/test/testjob1/std.out.%j",
GAP_STD_ERR_FILE: " /public/home/zhijiang/test/testjob1/std.err.%j",
},
}
jobResult, _ := submitJobLogic.SubmitJob(&submitReq)
if jobResult.Code == "0" {
infoList.HpcInfoList[index].Status = "Pending"
infoList.HpcInfoList[index].JobId = jobResult.Data
} else {
infoList.HpcInfoList[index].Result = "Failed"
infoList.HpcInfoList[index].Result = jobResult.Msg
}
}
}
}
func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) {
infoReq := pcmcoreclient.InfoListReq{
Kind: "hpc",
ServiceName: "ac",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
return nil, err
}
return infoList, nil
}

View File

@ -21,6 +21,7 @@ type (
ACTokenResp = hpcAC.ACTokenResp
ACTokenState = hpcAC.ACTokenState
ClusterResp = hpcAC.ClusterResp
CpResp = hpcAC.CpResp
CpuCore = hpcAC.CpuCore
CpuCoreReq = hpcAC.CpuCoreReq
CpuCoreResp = hpcAC.CpuCoreResp
@ -63,6 +64,7 @@ type (
QueueReq = hpcAC.QueueReq
QueueResp = hpcAC.QueueResp
QuotaData = hpcAC.QuotaData
ResourceReq = hpcAC.ResourceReq
SubmitJobReq = hpcAC.SubmitJobReq
SubmitJobResp = hpcAC.SubmitJobResp
TokenResp = hpcAC.TokenResp
@ -105,6 +107,8 @@ type (
GetACToken(ctx context.Context, in *ACTokenReq, opts ...grpc.CallOption) (*TokenResp, error)
// 曙光ac获取clusterid
GetACClusterId(ctx context.Context, in *ACClusterReq, opts ...grpc.CallOption) (*ClusterResp, error)
// 获取曙光账号算力
GetComputingPower(ctx context.Context, in *ResourceReq, opts ...grpc.CallOption) (*CpResp, error)
}
defaultHpcAC struct {
@ -222,3 +226,9 @@ func (m *defaultHpcAC) GetACClusterId(ctx context.Context, in *ACClusterReq, opt
client := hpcAC.NewHpcACClient(m.cli.Conn())
return client.GetACClusterId(ctx, in, opts...)
}
// 获取曙光账号算力
func (m *defaultHpcAC) GetComputingPower(ctx context.Context, in *ResourceReq, opts ...grpc.CallOption) (*CpResp, error) {
client := hpcAC.NewHpcACClient(m.cli.Conn())
return client.GetComputingPower(ctx, in, opts...)
}

View File

@ -0,0 +1,114 @@
package common
type CenterResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data struct {
Id int `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
IngressUrls []interface{} `json:"ingressUrls"`
EfileUrls []struct {
NodeName string `json:"nodeName"`
Enable string `json:"enable"`
FastTransEnable string `json:"fastTransEnable"`
UdpPort string `json:"udpPort"`
Version string `json:"version"`
Url string `json:"url"`
AppId string `json:"appId,omitempty"`
AppSecret string `json:"appSecret,omitempty"`
IsManagerNode string `json:"isManagerNode,omitempty"`
} `json:"efileUrls"`
EshellUrls []struct {
Enable string `json:"enable"`
Version string `json:"version"`
Url string `json:"url"`
NodeName string `json:"nodeName,omitempty"`
AppId string `json:"appId,omitempty"`
AppSecret string `json:"appSecret,omitempty"`
FastTransEnable string `json:"fastTransEnable,omitempty"`
UdpPort string `json:"udpPort,omitempty"`
IsManagerNode string `json:"isManagerNode,omitempty"`
} `json:"eshellUrls"`
HpcUrls []struct {
Enable string `json:"enable"`
IsManagerNode string `json:"isManagerNode"`
Version string `json:"version"`
Url string `json:"url"`
} `json:"hpcUrls"`
AiUrls []struct {
Enable string `json:"enable"`
Version string `json:"version"`
Url string `json:"url"`
} `json:"aiUrls"`
EshellSshHosts []struct {
Enable string `json:"enable"`
Url string `json:"url"`
NodeName string `json:"nodeName,omitempty"`
AppId string `json:"appId,omitempty"`
AppSecret string `json:"appSecret,omitempty"`
FastTransEnable string `json:"fastTransEnable,omitempty"`
UdpPort string `json:"udpPort,omitempty"`
IsManagerNode string `json:"isManagerNode,omitempty"`
Version string `json:"version,omitempty"`
} `json:"eshellSshHosts"`
InternetSshHosts []struct {
LimitState string `json:"limitState"`
Url string `json:"url"`
} `json:"internetSshHosts"`
ClusterUserInfo struct {
UserName string `json:"userName"`
HomePath string `json:"homePath"`
} `json:"clusterUserInfo"`
} `json:"data"`
}
type TokenResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data []struct {
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
Token string `json:"token"`
} `json:"data"`
}
type QuotaResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data struct {
UserName interface{} `json:"userName"`
AccountName interface{} `json:"accountName"`
UserMaxCpu int `json:"userMaxCpu"`
UserMaxDcu int `json:"userMaxDcu"`
UserMaxGpu int `json:"userMaxGpu"`
UserMaxMlu int `json:"userMaxMlu"`
UserMaxMem int `json:"userMaxMem"`
UserMaxNode int `json:"userMaxNode"`
UserMaxSubmitJob int `json:"userMaxSubmitJob"`
UserMaxRunJob int `json:"userMaxRunJob"`
AccountMaxCpu int `json:"accountMaxCpu"`
AccountMaxDcu int `json:"accountMaxDcu"`
AccountMaxGpu int `json:"accountMaxGpu"`
AccountMaxMlu int `json:"accountMaxMlu"`
AccountMaxMem int `json:"accountMaxMem"`
AccountMaxNode int `json:"accountMaxNode"`
AccountMaxSubmitJob int `json:"accountMaxSubmitJob"`
AccountMaxRunJob int `json:"accountMaxRunJob"`
UserMinCpu int `json:"userMinCpu"`
UserMinNode int `json:"userMinNode"`
MaxWallTime int `json:"maxWallTime"`
} `json:"data"`
}
type ClusterResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data []struct {
JobManagerAddr string `json:"JobManagerAddr"`
JobManagerType string `json:"JobManagerType"`
Id int `json:"id"`
Text string `json:"text"`
JobManagerPort string `json:"JobManagerPort"`
} `json:"data"`
}

View File

@ -10,4 +10,5 @@ type Config struct {
ShuguangConf
PcmCoreRpcConf zrpc.RpcClientConf
LogConf logx.LogConf
CPConf
}

View File

@ -13,3 +13,14 @@ type ShuguangConf struct {
Token string `json:"Token"`
ClusterID string `json:"ClusterID"`
}
// 算力相关配置
type CPConf struct {
AcBaseUrl string
AuthUrl string
ClusUrl string
CenterUrl string
UserLimitUrl string
Dcu float32
CpuCore float32
}

View File

@ -0,0 +1,101 @@
package logic
import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc"
"PCM/common/tool"
"context"
"github.com/zeromicro/go-zero/core/logx"
)
func InitCron(svc *svc.ServiceContext) {
submitJobLogic := NewSubmitJobLogic(context.Background(), svc)
listLogic := NewListJobLogic(context.Background(), svc)
svc.Cron.AddFunc("*/5 * * * * ?", func() {
syncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "hpc",
ServiceName: "ac",
}
// 查询core端分发下来的任务列表
infoList, err := queryCoreInfoList(svc)
if err != nil {
logx.Error(err)
return
}
// 提交任务
submitJob(infoList, submitJobLogic)
// 查询运行中的任务列表同步信息
listReq := hpcAC.ListJobReq{}
listJob, err := listLogic.ListJob(&listReq)
if err != nil {
logx.Error(err)
return
}
for index, _ := range infoList.HpcInfoList {
for _, job := range listJob.Jobs {
if job.JobName == infoList.HpcInfoList[index].Name {
infoList.HpcInfoList[index].JobId = job.JobId
infoList.HpcInfoList[index].StartTime = job.JobStartTime
infoList.HpcInfoList[index].RunningTime = int64(tool.RunTimeToSeconds(job.JobRunTime))
if job.JobStatus == "statR" {
infoList.HpcInfoList[index].Status = "Running"
}
if job.JobStatus == "statC" {
infoList.HpcInfoList[index].Status = "Completed"
}
}
}
}
// 同步信息到core端
if len(infoList.HpcInfoList) != 0 {
syncInfoReq.HpcInfoList = infoList.HpcInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq)
}
})
}
func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *SubmitJobLogic) {
for index, _ := range infoList.HpcInfoList {
if infoList.HpcInfoList[index].Status == "Saved" {
submitReq := hpcAC.SubmitJobReq{
Appname: "BASE",
Apptype: "BASIC",
StrJobManagerID: 1638523853,
MapAppJobInfo: &hpcAC.MapAppJobInfo{
GAP_CMD_FILE: "sleep 10",
GAP_NNODE: "1",
GAP_SUBMIT_TYPE: "cmd",
GAP_JOB_NAME: infoList.HpcInfoList[index].Name,
GAP_WORK_DIR: infoList.HpcInfoList[index].WorkDir,
GAP_QUEUE: "debug2",
GAP_NPROC: "1",
GAP_APPNAME: "BASE",
GAP_WALL_TIME: infoList.HpcInfoList[index].WallTime,
GAP_STD_OUT_FILE: "/public/home/zhijiang/test/testjob1/std.out.%j",
GAP_STD_ERR_FILE: " /public/home/zhijiang/test/testjob1/std.err.%j",
},
}
jobResult, _ := submitJobLogic.SubmitJob(&submitReq)
if jobResult.Code == "0" {
infoList.HpcInfoList[index].Status = "Pending"
infoList.HpcInfoList[index].JobId = jobResult.Data
} else {
infoList.HpcInfoList[index].Result = "Failed"
infoList.HpcInfoList[index].Result = jobResult.Msg
}
}
}
}
func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) {
infoReq := pcmcoreclient.InfoListReq{
Kind: "hpc",
ServiceName: "ac",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
return nil, err
}
return infoList, nil
}

View File

@ -0,0 +1,31 @@
package logic
import (
"context"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type GetComputingPowerLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewGetComputingPowerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetComputingPowerLogic {
return &GetComputingPowerLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// 获取曙光账号算力
func (l *GetComputingPowerLogic) GetComputingPower(in *hpcAC.ResourceReq) (*hpcAC.CpResp, error) {
// todo: add your logic here and delete this line
return &hpcAC.CpResp{}, nil
}

View File

@ -126,3 +126,9 @@ func (s *HpcACServer) GetACClusterId(ctx context.Context, in *hpcAC.ACClusterReq
l := logic.NewGetACClusterIdLogic(ctx, s.svcCtx)
return l.GetACClusterId(in)
}
// 获取曙光账号算力
func (s *HpcACServer) GetComputingPower(ctx context.Context, in *hpcAC.ResourceReq) (*hpcAC.CpResp, error) {
l := logic.NewGetComputingPowerLogic(ctx, s.svcCtx)
return l.GetComputingPower(in)
}

View File

@ -606,6 +606,14 @@ message ACClusterData {
string jobManagerPort = 5;
}
//
message resourceReq{
}
message cpResp{
float pOpsAtFp16 = 1;
}
// HPC Services for AC
service hpcAC {
@ -660,4 +668,7 @@ service hpcAC {
//ac获取clusterid
rpc GetACClusterId(ACClusterReq) returns(ClusterResp);
//
rpc GetComputingPower(resourceReq) returns (cpResp);
}

View File

@ -2,7 +2,7 @@ NacosConfig:
DataId: pcm-th-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
- IpAddr: 10.101.15.7
Port: 8848
ClientConfig:
NamespaceId: test

View File

@ -2,7 +2,9 @@ NacosConfig:
DataId: pcm-kubenative-rpc.yaml
Group: DEFAULT_GROUP
ServerConfigs:
- IpAddr: 127.0.0.1
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: 10.101.15.7
Port: 8848
- IpAddr: nacos-headless
Port: 8848

View File

@ -0,0 +1,75 @@
package logic
import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/internal/svc"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/kubenative"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/kubenativeclient"
"PCM/common/tool"
"context"
"github.com/zeromicro/go-zero/core/logx"
v1 "k8s.io/api/apps/v1"
"time"
)
func InitCron(svc *svc.ServiceContext) {
svc.Cron.AddFunc("*/5 * * * * ?", func() {
SyncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "cloud",
ServiceName: "kubeNative",
}
// 查询core端分发下来的任务列表
infoReq := pcmcoreclient.InfoListReq{
Kind: "cloud",
ServiceName: "kubeNative",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
logx.Error(err)
return
}
// 提交任务
applyYamlLogic := NewApplyYamlLogic(context.Background(), svc)
for index, _ := range infoList.CloudInfoList {
applyReq := kubenativeclient.ApplyReq{
YamlString: infoList.CloudInfoList[index].YamlString,
}
infoList.CloudInfoList[index].Status = "Pending"
_, err := applyYamlLogic.ApplyYaml(&applyReq)
if err != nil {
return
}
}
// 查询Deployment列表
listLogic := NewListLogic(context.Background(), svc)
listReq := kubenative.ListReq{
YamlString: "apiVersion: apps/v1\nkind: Deployment\n",
}
resp, err := listLogic.List(&listReq)
var deploymentList v1.DeploymentList
tool.K8sUnstructured(resp.Data, &deploymentList)
if err != nil {
return
}
// 遍历core端任务列表信息
for index, _ := range infoList.CloudInfoList {
for _, deployment := range deploymentList.Items {
if deployment.Namespace == infoList.CloudInfoList[index].Namespace && deployment.Name == infoList.CloudInfoList[index].Name {
infoList.CloudInfoList[index].StartTime = tool.TimeRemoveZone(deployment.Status.Conditions[0].LastTransitionTime.Time).String()
infoList.CloudInfoList[index].RunningTime = time.Now().Sub(deployment.Status.Conditions[0].LastTransitionTime.Time).Milliseconds() / 1000
// 判断状态
if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
infoList.CloudInfoList[index].Status = "running"
} else {
infoList.CloudInfoList[index].Status = "pending"
}
}
}
}
// 同步信息到core端
SyncInfoReq.CloudInfoList = infoList.CloudInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &SyncInfoReq)
})
}

View File

@ -1,22 +1,15 @@
package main
import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/internal/logic"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/kubenativeclient"
commonConfig "PCM/common/config"
"PCM/common/interceptor/rpcserver"
"PCM/common/tool"
"context"
"flag"
"github.com/zeromicro/go-zero/core/logx"
v1 "k8s.io/api/apps/v1"
"time"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/internal/config"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/internal/logic"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/internal/server"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/internal/svc"
"PCM/adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/kubenative"
commonConfig "PCM/common/config"
"PCM/common/interceptor/rpcserver"
"flag"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
@ -28,27 +21,7 @@ import (
var configFile = flag.String("f", "adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/etc/kubenative.yaml", "the config file")
func main() {
//flag.Parse()
//
//var c config.Config
//conf.MustLoad(*configFile, &c)
//// start log component
//logx.MustSetup(c.LogConf)
//ctx := svc.NewServiceContext(c)
//ctx.Cron.Start()
//s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
// kubenative.RegisterKubeNativeServer(grpcServer, server.NewKubeNativeServer(ctx))
//
// if c.Mode == service.DevMode || c.Mode == service.TestMode {
// reflection.Register(grpcServer)
// }
//})
//defer s.Stop()
//initCron(ctx)
//logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
//s.Start()
//------------
flag.Parse()
var bootstrapConfig commonConfig.BootstrapConfig
@ -90,68 +63,7 @@ func main() {
defer s.Stop()
logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
initCron(ctx)
// 初始化定时任务
logic.InitCron(ctx)
s.Start()
}
func initCron(svc *svc.ServiceContext) {
svc.Cron.AddFunc("*/5 * * * * ?", func() {
SyncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "cloud",
ServiceName: "kubeNative",
}
// 查询core端分发下来的任务列表
infoReq := pcmcoreclient.InfoListReq{
Kind: "cloud",
ServiceName: "kubeNative",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
logx.Error(err)
return
}
// 提交任务
applyYamlLogic := logic.NewApplyYamlLogic(context.Background(), svc)
for index, _ := range infoList.CloudInfoList {
applyReq := kubenativeclient.ApplyReq{
YamlString: infoList.CloudInfoList[index].YamlString,
}
infoList.CloudInfoList[index].Status = "Pending"
_, err := applyYamlLogic.ApplyYaml(&applyReq)
if err != nil {
return
}
}
// 查询Deployment列表
listLogic := logic.NewListLogic(context.Background(), svc)
listReq := kubenative.ListReq{
YamlString: "apiVersion: apps/v1\nkind: Deployment\n",
}
resp, err := listLogic.List(&listReq)
var deploymentList v1.DeploymentList
tool.K8sUnstructured(resp.Data, &deploymentList)
if err != nil {
return
}
// 遍历core端任务列表信息
for index, _ := range infoList.CloudInfoList {
for _, deployment := range deploymentList.Items {
if deployment.Namespace == infoList.CloudInfoList[index].Namespace && deployment.Name == infoList.CloudInfoList[index].Name {
infoList.CloudInfoList[index].StartTime = tool.TimeRemoveZone(deployment.Status.Conditions[0].LastTransitionTime.Time).String()
infoList.CloudInfoList[index].RunningTime = time.Now().Sub(deployment.Status.Conditions[0].LastTransitionTime.Time).Milliseconds() / 1000
// 判断状态
if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
infoList.CloudInfoList[index].Status = "running"
} else {
infoList.CloudInfoList[index].Status = "pending"
}
}
}
}
// 同步信息到core端
SyncInfoReq.CloudInfoList = infoList.CloudInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &SyncInfoReq)
})
}

View File

@ -0,0 +1,553 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.4
// source: pcm-ceph.proto
package ceph
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// *****************screen storage Start************************
type StorageScreenReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *StorageScreenReq) Reset() {
*x = StorageScreenReq{}
if protoimpl.UnsafeEnabled {
mi := &file_pcm_ceph_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StorageScreenReq) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StorageScreenReq) ProtoMessage() {}
func (x *StorageScreenReq) ProtoReflect() protoreflect.Message {
mi := &file_pcm_ceph_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StorageScreenReq.ProtoReflect.Descriptor instead.
func (*StorageScreenReq) Descriptor() ([]byte, []int) {
return file_pcm_ceph_proto_rawDescGZIP(), []int{0}
}
type AiCenterInfos struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // @gotags: copier:"id"
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // @gotags: copier:"name"
Desc string `protobuf:"bytes,3,opt,name=desc,proto3" json:"desc,omitempty"` // @gotags: copier:"name"
Resource string `protobuf:"bytes,4,opt,name=resource,proto3" json:"resource,omitempty"`
TrainJob string `protobuf:"bytes,5,opt,name=trainJob,proto3" json:"trainJob,omitempty"`
ComputeScale int32 `protobuf:"varint,6,opt,name=computeScale,proto3" json:"computeScale,omitempty"`
StorageScale int32 `protobuf:"varint,7,opt,name=storageScale,proto3" json:"storageScale,omitempty"` // @gotags: copier:"storageScale"
Province string `protobuf:"bytes,8,opt,name=province,proto3" json:"province,omitempty"`
City string `protobuf:"bytes,9,opt,name=city,proto3" json:"city,omitempty"` // @gotags: copier:"city"
CoordinateX int32 `protobuf:"varint,10,opt,name=coordinateX,proto3" json:"coordinateX,omitempty"`
CoordinateY int32 `protobuf:"varint,11,opt,name=coordinateY,proto3" json:"coordinateY,omitempty"`
Type int32 `protobuf:"varint,12,opt,name=type,proto3" json:"type,omitempty"`
Weight int32 `protobuf:"varint,13,opt,name=weight,proto3" json:"weight,omitempty"`
ConnectionState int32 `protobuf:"varint,14,opt,name=connectionState,proto3" json:"connectionState,omitempty"` // @gotags: copier:"connectionState"
BusyState int32 `protobuf:"varint,15,opt,name=busyState,proto3" json:"busyState,omitempty"`
ImageUrl string `protobuf:"bytes,16,opt,name=imageUrl,proto3" json:"imageUrl,omitempty"`
AccDevices string `protobuf:"bytes,17,opt,name=accDevices,proto3" json:"accDevices,omitempty"`
MarketTime int64 `protobuf:"varint,18,opt,name=marketTime,proto3" json:"marketTime,omitempty"`
CreatedAt int64 `protobuf:"varint,19,opt,name=createdAt,proto3" json:"createdAt,omitempty"`
AccessTime int32 `protobuf:"varint,20,opt,name=accessTime,proto3" json:"accessTime,omitempty"`
CardRunTime int32 `protobuf:"varint,21,opt,name=cardRunTime,proto3" json:"cardRunTime,omitempty"`
JobCount int32 `protobuf:"varint,22,opt,name=jobCount,proto3" json:"jobCount,omitempty"`
}
func (x *AiCenterInfos) Reset() {
*x = AiCenterInfos{}
if protoimpl.UnsafeEnabled {
mi := &file_pcm_ceph_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AiCenterInfos) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AiCenterInfos) ProtoMessage() {}
func (x *AiCenterInfos) ProtoReflect() protoreflect.Message {
mi := &file_pcm_ceph_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AiCenterInfos.ProtoReflect.Descriptor instead.
func (*AiCenterInfos) Descriptor() ([]byte, []int) {
return file_pcm_ceph_proto_rawDescGZIP(), []int{1}
}
func (x *AiCenterInfos) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *AiCenterInfos) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *AiCenterInfos) GetDesc() string {
if x != nil {
return x.Desc
}
return ""
}
func (x *AiCenterInfos) GetResource() string {
if x != nil {
return x.Resource
}
return ""
}
func (x *AiCenterInfos) GetTrainJob() string {
if x != nil {
return x.TrainJob
}
return ""
}
func (x *AiCenterInfos) GetComputeScale() int32 {
if x != nil {
return x.ComputeScale
}
return 0
}
func (x *AiCenterInfos) GetStorageScale() int32 {
if x != nil {
return x.StorageScale
}
return 0
}
func (x *AiCenterInfos) GetProvince() string {
if x != nil {
return x.Province
}
return ""
}
func (x *AiCenterInfos) GetCity() string {
if x != nil {
return x.City
}
return ""
}
func (x *AiCenterInfos) GetCoordinateX() int32 {
if x != nil {
return x.CoordinateX
}
return 0
}
func (x *AiCenterInfos) GetCoordinateY() int32 {
if x != nil {
return x.CoordinateY
}
return 0
}
func (x *AiCenterInfos) GetType() int32 {
if x != nil {
return x.Type
}
return 0
}
func (x *AiCenterInfos) GetWeight() int32 {
if x != nil {
return x.Weight
}
return 0
}
func (x *AiCenterInfos) GetConnectionState() int32 {
if x != nil {
return x.ConnectionState
}
return 0
}
func (x *AiCenterInfos) GetBusyState() int32 {
if x != nil {
return x.BusyState
}
return 0
}
func (x *AiCenterInfos) GetImageUrl() string {
if x != nil {
return x.ImageUrl
}
return ""
}
func (x *AiCenterInfos) GetAccDevices() string {
if x != nil {
return x.AccDevices
}
return ""
}
func (x *AiCenterInfos) GetMarketTime() int64 {
if x != nil {
return x.MarketTime
}
return 0
}
func (x *AiCenterInfos) GetCreatedAt() int64 {
if x != nil {
return x.CreatedAt
}
return 0
}
func (x *AiCenterInfos) GetAccessTime() int32 {
if x != nil {
return x.AccessTime
}
return 0
}
func (x *AiCenterInfos) GetCardRunTime() int32 {
if x != nil {
return x.CardRunTime
}
return 0
}
func (x *AiCenterInfos) GetJobCount() int32 {
if x != nil {
return x.JobCount
}
return 0
}
type StorageScreenResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
TotalSize int32 `protobuf:"varint,1,opt,name=totalSize,proto3" json:"totalSize,omitempty"` // @gotags: copier:"totalSize"
AiCenterInfos []*AiCenterInfos `protobuf:"bytes,2,rep,name=aiCenterInfos,proto3" json:"aiCenterInfos,omitempty"` // @gotags: copier:"aiCenterInfos"
StorageUsed float32 `protobuf:"fixed32,3,opt,name=StorageUsed,proto3" json:"StorageUsed,omitempty"`
StorageUsing float32 `protobuf:"fixed32,4,opt,name=StorageUsing,proto3" json:"StorageUsing,omitempty"`
UsageRate float32 `protobuf:"fixed32,5,opt,name=UsageRate,proto3" json:"UsageRate,omitempty"`
UsingRate float32 `protobuf:"fixed32,6,opt,name=UsingRate,proto3" json:"UsingRate,omitempty"`
Code int32 `protobuf:"varint,7,opt,name=code,proto3" json:"code,omitempty"` // @gotags: copier:"Code"
Msg string `protobuf:"bytes,8,opt,name=msg,proto3" json:"msg,omitempty"` // @gotags: copier:"Msg"
ErrorMsg string `protobuf:"bytes,9,opt,name=error_msg,json=errorMsg,proto3" json:"error_msg,omitempty"` // @gotags: copier:"ErrorMsg"
}
func (x *StorageScreenResp) Reset() {
*x = StorageScreenResp{}
if protoimpl.UnsafeEnabled {
mi := &file_pcm_ceph_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StorageScreenResp) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StorageScreenResp) ProtoMessage() {}
func (x *StorageScreenResp) ProtoReflect() protoreflect.Message {
mi := &file_pcm_ceph_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StorageScreenResp.ProtoReflect.Descriptor instead.
func (*StorageScreenResp) Descriptor() ([]byte, []int) {
return file_pcm_ceph_proto_rawDescGZIP(), []int{2}
}
func (x *StorageScreenResp) GetTotalSize() int32 {
if x != nil {
return x.TotalSize
}
return 0
}
func (x *StorageScreenResp) GetAiCenterInfos() []*AiCenterInfos {
if x != nil {
return x.AiCenterInfos
}
return nil
}
func (x *StorageScreenResp) GetStorageUsed() float32 {
if x != nil {
return x.StorageUsed
}
return 0
}
func (x *StorageScreenResp) GetStorageUsing() float32 {
if x != nil {
return x.StorageUsing
}
return 0
}
func (x *StorageScreenResp) GetUsageRate() float32 {
if x != nil {
return x.UsageRate
}
return 0
}
func (x *StorageScreenResp) GetUsingRate() float32 {
if x != nil {
return x.UsingRate
}
return 0
}
func (x *StorageScreenResp) GetCode() int32 {
if x != nil {
return x.Code
}
return 0
}
func (x *StorageScreenResp) GetMsg() string {
if x != nil {
return x.Msg
}
return ""
}
func (x *StorageScreenResp) GetErrorMsg() string {
if x != nil {
return x.ErrorMsg
}
return ""
}
var File_pcm_ceph_proto protoreflect.FileDescriptor
var file_pcm_ceph_proto_rawDesc = []byte{
0x0a, 0x0e, 0x70, 0x63, 0x6d, 0x2d, 0x63, 0x65, 0x70, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x08, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x22, 0x12, 0x0a, 0x10, 0x53, 0x74,
0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x63, 0x72, 0x65, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x22, 0x87,
0x05, 0x0a, 0x0d, 0x41, 0x69, 0x43, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x73,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x65, 0x73, 0x63, 0x18, 0x03, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x64, 0x65, 0x73, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f,
0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f,
0x75, 0x72, 0x63, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x72, 0x61, 0x69, 0x6e, 0x4a, 0x6f, 0x62,
0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x72, 0x61, 0x69, 0x6e, 0x4a, 0x6f, 0x62,
0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x53, 0x63, 0x61, 0x6c, 0x65,
0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x53,
0x63, 0x61, 0x6c, 0x65, 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53,
0x63, 0x61, 0x6c, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x72,
0x61, 0x67, 0x65, 0x53, 0x63, 0x61, 0x6c, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76,
0x69, 0x6e, 0x63, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76,
0x69, 0x6e, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x69, 0x74, 0x79, 0x18, 0x09, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x63, 0x69, 0x74, 0x79, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6f, 0x72,
0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x58, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63,
0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x58, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f,
0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x59, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52,
0x0b, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x59, 0x12, 0x12, 0x0a, 0x04,
0x74, 0x79, 0x70, 0x65, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65,
0x12, 0x16, 0x0a, 0x06, 0x77, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x05,
0x52, 0x06, 0x77, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x28, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e,
0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x18, 0x0e, 0x20, 0x01, 0x28,
0x05, 0x52, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61,
0x74, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x62, 0x75, 0x73, 0x79, 0x53, 0x74, 0x61, 0x74, 0x65, 0x18,
0x0f, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x62, 0x75, 0x73, 0x79, 0x53, 0x74, 0x61, 0x74, 0x65,
0x12, 0x1a, 0x0a, 0x08, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x55, 0x72, 0x6c, 0x18, 0x10, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x55, 0x72, 0x6c, 0x12, 0x1e, 0x0a, 0x0a,
0x61, 0x63, 0x63, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x11, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x61, 0x63, 0x63, 0x44, 0x65, 0x76, 0x69, 0x63, 0x65, 0x73, 0x12, 0x1e, 0x0a, 0x0a,
0x6d, 0x61, 0x72, 0x6b, 0x65, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x12, 0x20, 0x01, 0x28, 0x03,
0x52, 0x0a, 0x6d, 0x61, 0x72, 0x6b, 0x65, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09,
0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x18, 0x13, 0x20, 0x01, 0x28, 0x03, 0x52,
0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x61, 0x63,
0x63, 0x65, 0x73, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x14, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a,
0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x61,
0x72, 0x64, 0x52, 0x75, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x15, 0x20, 0x01, 0x28, 0x05, 0x52,
0x0b, 0x63, 0x61, 0x72, 0x64, 0x52, 0x75, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08,
0x6a, 0x6f, 0x62, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x16, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08,
0x6a, 0x6f, 0x62, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0xb5, 0x02, 0x0a, 0x11, 0x53, 0x74, 0x6f,
0x72, 0x61, 0x67, 0x65, 0x53, 0x63, 0x72, 0x65, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x12, 0x1c,
0x0a, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x05, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x3d, 0x0a, 0x0d,
0x61, 0x69, 0x43, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x18, 0x02, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x2e, 0x41,
0x69, 0x43, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x52, 0x0d, 0x61, 0x69,
0x43, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x53,
0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x55, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x02,
0x52, 0x0b, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x55, 0x73, 0x65, 0x64, 0x12, 0x22, 0x0a,
0x0c, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x55, 0x73, 0x69, 0x6e, 0x67, 0x18, 0x04, 0x20,
0x01, 0x28, 0x02, 0x52, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x55, 0x73, 0x69, 0x6e,
0x67, 0x12, 0x1c, 0x0a, 0x09, 0x55, 0x73, 0x61, 0x67, 0x65, 0x52, 0x61, 0x74, 0x65, 0x18, 0x05,
0x20, 0x01, 0x28, 0x02, 0x52, 0x09, 0x55, 0x73, 0x61, 0x67, 0x65, 0x52, 0x61, 0x74, 0x65, 0x12,
0x1c, 0x0a, 0x09, 0x55, 0x73, 0x69, 0x6e, 0x67, 0x52, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01,
0x28, 0x02, 0x52, 0x09, 0x55, 0x73, 0x69, 0x6e, 0x67, 0x52, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a,
0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64,
0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
0x6d, 0x73, 0x67, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67,
0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67,
0x32, 0x50, 0x0a, 0x04, 0x43, 0x65, 0x70, 0x68, 0x12, 0x48, 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72,
0x61, 0x67, 0x65, 0x53, 0x63, 0x72, 0x65, 0x65, 0x6e, 0x12, 0x1a, 0x2e, 0x74, 0x65, 0x6d, 0x70,
0x6c, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x63, 0x72, 0x65,
0x65, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x1b, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65,
0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x63, 0x72, 0x65, 0x65, 0x6e, 0x52, 0x65,
0x73, 0x70, 0x42, 0x07, 0x5a, 0x05, 0x2f, 0x63, 0x65, 0x70, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (
file_pcm_ceph_proto_rawDescOnce sync.Once
file_pcm_ceph_proto_rawDescData = file_pcm_ceph_proto_rawDesc
)
func file_pcm_ceph_proto_rawDescGZIP() []byte {
file_pcm_ceph_proto_rawDescOnce.Do(func() {
file_pcm_ceph_proto_rawDescData = protoimpl.X.CompressGZIP(file_pcm_ceph_proto_rawDescData)
})
return file_pcm_ceph_proto_rawDescData
}
var file_pcm_ceph_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_pcm_ceph_proto_goTypes = []interface{}{
(*StorageScreenReq)(nil), // 0: template.StorageScreenReq
(*AiCenterInfos)(nil), // 1: template.AiCenterInfos
(*StorageScreenResp)(nil), // 2: template.StorageScreenResp
}
var file_pcm_ceph_proto_depIdxs = []int32{
1, // 0: template.StorageScreenResp.aiCenterInfos:type_name -> template.AiCenterInfos
0, // 1: template.Ceph.storageScreen:input_type -> template.StorageScreenReq
2, // 2: template.Ceph.storageScreen:output_type -> template.StorageScreenResp
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_pcm_ceph_proto_init() }
func file_pcm_ceph_proto_init() {
if File_pcm_ceph_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pcm_ceph_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StorageScreenReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pcm_ceph_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AiCenterInfos); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pcm_ceph_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StorageScreenResp); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pcm_ceph_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pcm_ceph_proto_goTypes,
DependencyIndexes: file_pcm_ceph_proto_depIdxs,
MessageInfos: file_pcm_ceph_proto_msgTypes,
}.Build()
File_pcm_ceph_proto = out.File
file_pcm_ceph_proto_rawDesc = nil
file_pcm_ceph_proto_goTypes = nil
file_pcm_ceph_proto_depIdxs = nil
}

View File

@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pcm-ceph.proto
package ceph
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// CephClient is the client API for Ceph service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type CephClient interface {
StorageScreen(ctx context.Context, in *StorageScreenReq, opts ...grpc.CallOption) (*StorageScreenResp, error)
}
type cephClient struct {
cc grpc.ClientConnInterface
}
func NewCephClient(cc grpc.ClientConnInterface) CephClient {
return &cephClient{cc}
}
func (c *cephClient) StorageScreen(ctx context.Context, in *StorageScreenReq, opts ...grpc.CallOption) (*StorageScreenResp, error) {
out := new(StorageScreenResp)
err := c.cc.Invoke(ctx, "/template.Ceph/storageScreen", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// CephServer is the server API for Ceph service.
// All implementations must embed UnimplementedCephServer
// for forward compatibility
type CephServer interface {
StorageScreen(context.Context, *StorageScreenReq) (*StorageScreenResp, error)
mustEmbedUnimplementedCephServer()
}
// UnimplementedCephServer must be embedded to have forward compatible implementations.
type UnimplementedCephServer struct {
}
func (UnimplementedCephServer) StorageScreen(context.Context, *StorageScreenReq) (*StorageScreenResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method StorageScreen not implemented")
}
func (UnimplementedCephServer) mustEmbedUnimplementedCephServer() {}
// UnsafeCephServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to CephServer will
// result in compilation errors.
type UnsafeCephServer interface {
mustEmbedUnimplementedCephServer()
}
func RegisterCephServer(s grpc.ServiceRegistrar, srv CephServer) {
s.RegisterService(&Ceph_ServiceDesc, srv)
}
func _Ceph_StorageScreen_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StorageScreenReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CephServer).StorageScreen(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/template.Ceph/storageScreen",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CephServer).StorageScreen(ctx, req.(*StorageScreenReq))
}
return interceptor(ctx, in, info, handler)
}
// Ceph_ServiceDesc is the grpc.ServiceDesc for Ceph service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Ceph_ServiceDesc = grpc.ServiceDesc{
ServiceName: "template.Ceph",
HandlerType: (*CephServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "storageScreen",
Handler: _Ceph_StorageScreen_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pcm-ceph.proto",
}

View File

@ -0,0 +1,38 @@
// Code generated by goctl. DO NOT EDIT.
// Source: pcm-ceph.proto
package cephclient
import (
"context"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/ceph"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
)
type (
AiCenterInfos = ceph.AiCenterInfos
StorageScreenReq = ceph.StorageScreenReq
StorageScreenResp = ceph.StorageScreenResp
Ceph interface {
StorageScreen(ctx context.Context, in *StorageScreenReq, opts ...grpc.CallOption) (*StorageScreenResp, error)
}
defaultCeph struct {
cli zrpc.Client
}
)
func NewCeph(cli zrpc.Client) Ceph {
return &defaultCeph{
cli: cli,
}
}
func (m *defaultCeph) StorageScreen(ctx context.Context, in *StorageScreenReq, opts ...grpc.CallOption) (*StorageScreenResp, error) {
client := ceph.NewCephClient(m.cli.Conn())
return client.StorageScreen(ctx, in, opts...)
}

View File

@ -0,0 +1,10 @@
Name: ceph.rpc
ListenOn: 0.0.0.0:2004
#Hosts:
# - 127.0.0.1:2379
#Key: octopus.rpc
#User: root
#Pass:
PengChengUrl: "http://grampus.openi.org.cn/openapi/v1/sharescreen/"

View File

@ -0,0 +1,17 @@
Name: pcmceph.rpc
ListenOn: 0.0.0.0:2005
Etcd:
Hosts:
- 10.101.15.170:31890
Key: pcmceph.rpc
User: root
Pass: I9wLvrRufj
#Hosts:
# - 127.0.0.1:2379
#Key: pcmceph.rpc
#User: root
#Pass:
ScreenUrl: "http://grampus.openi.org.cn/openapi/v1/sharescreen/"

View File

@ -0,0 +1,21 @@
package config
import (
"flag"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/zrpc"
)
type Config struct {
zrpc.RpcServerConf
ScreenConfig
}
var configFile = flag.String("o", "adaptor/PCM-STORAGE/PCM-CEPH/rpc/etc/pcmceph.yaml", "the config file")
var Cfg = getConfig()
func getConfig() Config {
var c Config
conf.MustLoad(*configFile, &c)
return c
}

View File

@ -0,0 +1,5 @@
package config
type ScreenConfig struct {
ScreenUrl string
}

View File

@ -0,0 +1,114 @@
package logic
import (
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/config"
"PCM/common/tool"
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/json"
"strings"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/ceph"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type StorageScreenLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
/*type StorageArg struct {
totalSize int32 `json:"totalSize"`
aiCenterInfos []AiCenterInfos `json:"aiCenterInfos"`
}
type AiCenterInfos struct {
id string `json:"id"`
name string `json:"name"`
/* desc string
resource string
trainJob string
computeScale int32
storageScale int32 `json:"storageScale"`
/* province string
city string
coordinateX int32
coordinateY int32
weight int32
connectionState int32
busyState int32
imageUrl string
accDevices string
marketTime int64
createdAt int64
accessTime int32
cardRunTime int32
jobCount int32
}*/
func NewStorageScreenLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StorageScreenLogic {
return &StorageScreenLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// get modelarts Token
func (l *StorageScreenLogic) StorageScreen(in *ceph.StorageScreenReq) (*ceph.StorageScreenResp, error) {
// todo: add your logic here and delete this line
var resp ceph.StorageScreenResp
//var storageArg StorageArg
screenConfig := config.Cfg
screenUrl := screenConfig.ScreenConfig.ScreenUrl
statusCode, body, err := tool.HttpClientWithScreen(tool.GET, screenUrl+"aicenter?sortBy=weight&orderBy=asc", strings.NewReader(``))
if err != nil {
return nil, err
}
if statusCode == 200 {
err := json.Unmarshal(body, &resp)
if err != nil {
fmt.Println(err)
}
var usedStorageScale int32 //已使用
var usingStorageScale int32 //未使用
for _, item := range resp.AiCenterInfos {
if item.ConnectionState == 3 {
usedStorageScale += item.StorageScale
} else {
usingStorageScale += item.StorageScale
}
}
fmt.Println(usedStorageScale)
fmt.Println(usingStorageScale)
var floatUsedStorageScale float32 = float32(usedStorageScale)
var floatUsingStorageScale float32 = float32(usingStorageScale)
var UsageRate float32 //已使用率
var UsingRate float32 //未使用率
var StorageUsed float32 //已使用量
var StorageUsing float32 //未使用量
UsageRate = (floatUsedStorageScale*1024 + 54.6) / (floatUsedStorageScale*1024 + floatUsingStorageScale*1024 + 54.6)
UsingRate = (floatUsingStorageScale*1024 + 54.6) / (floatUsedStorageScale*1024 + floatUsingStorageScale*1024 + 54.6)
StorageUsed = floatUsedStorageScale*1024 + 54.6
StorageUsing = floatUsingStorageScale*1024 + 54.6
fmt.Println(StorageUsed)
fmt.Println(StorageUsing)
fmt.Println(UsageRate)
fmt.Println(UsingRate)
resp.UsageRate = UsageRate
resp.UsageRate = UsingRate
resp.StorageUsing = StorageUsing
resp.StorageUsing = StorageUsed
resp.Code = 200
resp.Msg = "Success"
} else if statusCode != 200 {
json.Unmarshal(body, &resp)
resp.Code = 400
resp.Msg = "Failure"
}
return &resp, nil
}

View File

@ -0,0 +1,28 @@
// Code generated by goctl. DO NOT EDIT.
// Source: pcm-ceph.proto
package server
import (
"context"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/ceph"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/logic"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/svc"
)
type CephServer struct {
svcCtx *svc.ServiceContext
ceph.UnimplementedCephServer
}
func NewCephServer(svcCtx *svc.ServiceContext) *CephServer {
return &CephServer{
svcCtx: svcCtx,
}
}
func (s *CephServer) StorageScreen(ctx context.Context, in *ceph.StorageScreenReq) (*ceph.StorageScreenResp, error) {
l := logic.NewStorageScreenLogic(ctx, s.svcCtx)
return l.StorageScreen(in)
}

View File

@ -0,0 +1,13 @@
package svc
import "PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/config"
type ServiceContext struct {
Config config.Config
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
}
}

View File

@ -1,3 +1,56 @@
syntax = "proto3";
package template;
package template;
option go_package = "/ceph";
/******************screen storage Start*************************/
message StorageScreenReq{
}
message AiCenterInfos{
string id = 1; // @gotags: copier:"id"
string name = 2; // @gotags: copier:"name"
string desc = 3; // @gotags: copier:"name"
string resource = 4;
string trainJob = 5;
int32 computeScale = 6;
int32 storageScale = 7; // @gotags: copier:"storageScale"
string province =8;
string city = 9 ; // @gotags: copier:"city"
int32 coordinateX =10;
int32 coordinateY =11;
int32 type =12;
int32 weight =13;
int32 connectionState = 14; // @gotags: copier:"connectionState"
int32 busyState =15;
string imageUrl =16;
string accDevices =17;
int64 marketTime =18;
int64 createdAt =19;
int32 accessTime =20;
int32 cardRunTime =21;
int32 jobCount=22;
}
message StorageScreenResp{
int32 totalSize = 1; // @gotags: copier:"totalSize"
repeated AiCenterInfos aiCenterInfos = 2; // @gotags: copier:"aiCenterInfos"
float StorageUsed = 3;
float StorageUsing = 4;
float UsageRate =5;
float UsingRate =6;
int32 code = 7; // @gotags: copier:"Code"
string msg = 8; // @gotags: copier:"Msg"
string error_msg =9;// @gotags: copier:"ErrorMsg"
}
/******************screen storage end*************************/
/******************screen computing power Start*************************/
/******************screen computing power End*************************/
service Ceph {
rpc storageScreen(StorageScreenReq) returns (StorageScreenResp);
}

View File

@ -0,0 +1,39 @@
package main
import (
"flag"
"fmt"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/ceph"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/config"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/server"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/internal/svc"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var configFile = flag.String("f", "adaptor/PCM-STORAGE/PCM-CEPH/rpc/etc/pcmceph.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
ceph.RegisterCephServer(grpcServer, server.NewCephServer(ctx))
if c.Mode == service.DevMode || c.Mode == service.TestMode {
reflection.Register(grpcServer)
}
})
defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start()
}

View File

@ -175,3 +175,17 @@ func HttpClientWithBodyAndCode(method string, url string, payload io.Reader, tok
return res.StatusCode, body, err
}
func HttpClientWithScreen(method string, url string, payload io.Reader) (int, []byte, error) {
request, err := http.NewRequest(method, url, payload)
client := &http.Client{}
res, err := client.Do(request)
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
return res.StatusCode, body, err
}

3
go.mod
View File

@ -1,6 +1,6 @@
module PCM
go 1.19
go 1.18
require (
github.com/JCCE-nudt/zero-contrib/zrpc/registry/nacos v0.0.0-20230419021610-13bbc83fbc3c
@ -10,7 +10,6 @@ require (
github.com/go-resty/resty/v2 v2.7.0
github.com/go-sql-driver/mysql v1.7.0
github.com/jinzhu/copier v0.3.5
github.com/jmoiron/sqlx v1.3.5
github.com/nacos-group/nacos-sdk-go/v2 v2.2.1
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1

7
go.sum
View File

@ -562,7 +562,6 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -744,8 +743,6 @@ github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHW
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@ -784,8 +781,6 @@ github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtB
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA=
github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA=
@ -805,8 +800,6 @@ github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=