hpcac对接

This commit is contained in:
zhangwei 2023-04-06 09:42:15 +08:00
parent d7a9f0c471
commit 0edbbd7566
18 changed files with 203 additions and 256 deletions

View File

@ -11,10 +11,9 @@ import (
"context"
"database/sql"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
appv1 "k8s.io/api/apps/v1"
"time"
"github.com/zeromicro/go-zero/core/logx"
)
type ScheduleTaskLogic struct {
@ -43,6 +42,10 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (resp *type
if checkResult != "" {
return resp, result2.NewDefaultError(checkResult)
}
bytes, err := json.Marshal(req.Metadata)
if err != nil {
return nil, err
}
// construct task info
task := model.Task{
Kind: req.Kind,
@ -50,6 +53,7 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (resp *type
Status: "Saved",
Description: req.Description,
Name: req.Name,
YamlString: string(bytes),
StartTime: time.Now(),
CreatedTime: time.Now(),
UpdatedTime: time.Now(),

View File

@ -33,7 +33,7 @@ func (l *ScheduleCloudMq) Consume(_, val string) error {
return err
}
clouds := tool.UnMarshalK8sStruct(string(bytes), req.TaskId)
// 处理返回数据并入库
// 存储数据
_, err = l.svcCtx.Db.NamedExec("insert into cloud (task_id,kind,namespace,name,api_version,status) values (:task_id,:kind,:namespace,:name,:api_version,:status)", clouds)
if err != nil {
return err

View File

@ -3,6 +3,8 @@ package kq
import (
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/api/internal/types"
"PCM/adaptor/PCM-CORE/model"
"PCM/common/tool"
"context"
"encoding/json"
)
@ -27,6 +29,12 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
// 接受消息
var req *types.ScheduleTaskReq
json.Unmarshal([]byte(val), &req)
var hpc model.Hpc
tool.Convert(req.Metadata, &hpc)
// 存储数据
_, err := l.svcCtx.Db.NamedExec("insert into cloud (task_id,service_name,name,status) values (:task_id,:service_name,:name,:status)", hpc)
if err != nil {
return err
}
return nil
}

View File

@ -22,6 +22,6 @@ type (
// NewTaskModel returns a model for the database table.
func NewTaskModel(conn sqlx.SqlConn, c cache.CacheConf) TaskModel {
return &customTaskModel{
defaultTaskModel: newTaskModel(conn, c),
defaultTaskModel: newTaskModel(conn),
}
}

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/sqlc"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/stringx"
@ -21,8 +20,6 @@ var (
taskRows = strings.Join(taskFieldNames, ",")
taskRowsExpectAutoSet = strings.Join(stringx.Remove(taskFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
taskRowsWithPlaceHolder = strings.Join(stringx.Remove(taskFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
cachePcmTaskIdPrefix = "cache:pcm:task:id:"
)
type (
@ -34,7 +31,7 @@ type (
}
defaultTaskModel struct {
sqlc.CachedConn
conn sqlx.SqlConn
table string
}
@ -53,38 +50,33 @@ type (
StartTime time.Time `db:"start_time"` // 开始运行时间
EndTime string `db:"end_time"` // 结束运行时间
RunningTime int64 `db:"running_time"` // 已运行时间(单位秒)
Result string `db:"result"` // 作业结果
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
YamlString string `db:"yaml_string"`
Result string `db:"result"` // 作业结果
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
}
)
func newTaskModel(conn sqlx.SqlConn, c cache.CacheConf) *defaultTaskModel {
func newTaskModel(conn sqlx.SqlConn) *defaultTaskModel {
return &defaultTaskModel{
CachedConn: sqlc.NewConn(conn, c),
table: "`task`",
conn: conn,
table: "`task`",
}
}
func (m *defaultTaskModel) Delete(ctx context.Context, id int64) error {
pcmTaskIdKey := fmt.Sprintf("%s%v", cachePcmTaskIdPrefix, id)
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
return conn.ExecCtx(ctx, query, id)
}, pcmTaskIdKey)
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
_, err := m.conn.ExecCtx(ctx, query, id)
return err
}
func (m *defaultTaskModel) FindOne(ctx context.Context, id int64) (*Task, error) {
pcmTaskIdKey := fmt.Sprintf("%s%v", cachePcmTaskIdPrefix, id)
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskRows, m.table)
var resp Task
err := m.QueryRowCtx(ctx, &resp, pcmTaskIdKey, func(ctx context.Context, conn sqlx.SqlConn, v interface{}) error {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskRows, m.table)
return conn.QueryRowCtx(ctx, v, query, id)
})
err := m.conn.QueryRowCtx(ctx, &resp, query, id)
switch err {
case nil:
return &resp, nil
@ -96,32 +88,17 @@ func (m *defaultTaskModel) FindOne(ctx context.Context, id int64) (*Task, error)
}
func (m *defaultTaskModel) Insert(ctx context.Context, data *Task) (sql.Result, error) {
pcmTaskIdKey := fmt.Sprintf("%s%v", cachePcmTaskIdPrefix, data.Id)
ret, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskRowsExpectAutoSet)
return conn.ExecCtx(ctx, query, data.ServiceId, data.JobId, data.Name, data.ServiceName, data.Description, data.Kind, data.Status, data.Strategy, data.SynergyStatus, data.CardCount, data.StartTime, data.EndTime, data.RunningTime, data.Result, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.DeletedFlag)
}, pcmTaskIdKey)
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.ServiceId, data.JobId, data.Name, data.ServiceName, data.Description, data.Kind, data.Status, data.Strategy, data.SynergyStatus, data.CardCount, data.StartTime, data.EndTime, data.RunningTime, data.YamlString, data.Result, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.DeletedFlag)
return ret, err
}
func (m *defaultTaskModel) Update(ctx context.Context, data *Task) error {
pcmTaskIdKey := fmt.Sprintf("%s%v", cachePcmTaskIdPrefix, data.Id)
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskRowsWithPlaceHolder)
return conn.ExecCtx(ctx, query, data.ServiceId, data.JobId, data.Name, data.ServiceName, data.Description, data.Kind, data.Status, data.Strategy, data.SynergyStatus, data.CardCount, data.StartTime, data.EndTime, data.RunningTime, data.Result, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.DeletedFlag, data.Id)
}, pcmTaskIdKey)
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.ServiceId, data.JobId, data.Name, data.ServiceName, data.Description, data.Kind, data.Status, data.Strategy, data.SynergyStatus, data.CardCount, data.StartTime, data.EndTime, data.RunningTime, data.YamlString, data.Result, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.DeletedFlag, data.Id)
return err
}
func (m *defaultTaskModel) formatPrimary(primary interface{}) string {
return fmt.Sprintf("%s%v", cachePcmTaskIdPrefix, primary)
}
func (m *defaultTaskModel) queryPrimary(ctx context.Context, conn sqlx.SqlConn, v, primary interface{}) error {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskRows, m.table)
return conn.QueryRowCtx(ctx, v, query, primary)
}
func (m *defaultTaskModel) tableName() string {
return m.table
}

View File

@ -56,7 +56,7 @@ func (l *InfoListLogic) InfoList(in *pcmCore.InfoListReq) (*pcmCore.InfoListResp
var taskId int64
var yamlString string
taskRows.Scan(&taskId, &yamlString)
result.Yaml = append(result.Yaml, yamlString)
result.YamlString = append(result.YamlString, yamlString)
taskIds = append(taskIds, taskId)
}
// 将任务状态修改为已提交

View File

@ -38,18 +38,18 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp
var query string
switch in.Kind {
case "cloud":
query = "INSERT INTO cloud (task_id, namespace,name,status,running_time,start_time) VALUES "
query = "INSERT INTO cloud (service_name,task_id, namespace,name,status,running_time,start_time) VALUES "
case "hpc":
query = "INSERT INTO hpc (task_id,job_id,name,status,running_time,start_time) VALUES "
query = "INSERT INTO hpc (service_name,task_id,job_id,name,status,running_time,start_time) VALUES "
case "ai":
query = "INSERT INTO ai (task_id,project_id,name,status,running_time,start_time) VALUES "
query = "INSERT INTO ai (service_name,task_id,project_id,name,status,running_time,start_time) VALUES "
}
for i, info := range in.LatestExternalInfoList {
if i > 0 {
query += ","
}
query += fmt.Sprintf("(%d,'%s','%s','%s',%d,'%s')", info.TaskId, info.ExternalField, info.Name, info.Status, info.RunningTime, tool.TimeStringRemoveZone(info.StartTime))
query += fmt.Sprintf("('%s',%d,'%s','%s','%s',%d,'%s')", info.ServiceName, info.TaskId, info.ExternalField, info.Name, info.Status, info.RunningTime, tool.TimeStringRemoveZone(info.StartTime))
}
query += " ON DUPLICATE KEY UPDATE status=VALUES(status),start_time=VALUES(start_time),running_time=VALUES(running_time)"

View File

@ -9,6 +9,7 @@ message SyncInfoReq {
}
message LatestExternalInfo {
string serviceName = 1;
int64 taskId = 2;
string externalField = 3;
string name = 4;

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v3.19.4
// protoc v3.21.12
// source: pcmCore.proto
package pcmCore
@ -80,6 +80,7 @@ type LatestExternalInfo struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ServiceName string `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
TaskId int64 `protobuf:"varint,2,opt,name=taskId,proto3" json:"taskId,omitempty"`
ExternalField string `protobuf:"bytes,3,opt,name=externalField,proto3" json:"externalField,omitempty"`
Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"`
@ -120,6 +121,13 @@ func (*LatestExternalInfo) Descriptor() ([]byte, []int) {
return file_pcmCore_proto_rawDescGZIP(), []int{1}
}
func (x *LatestExternalInfo) GetServiceName() string {
if x != nil {
return x.ServiceName
}
return ""
}
func (x *LatestExternalInfo) GetTaskId() int64 {
if x != nil {
return x.TaskId
@ -277,7 +285,7 @@ type InfoListResp struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Yaml []string `protobuf:"bytes,1,rep,name=yaml,proto3" json:"yaml,omitempty"`
YamlString []string `protobuf:"bytes,1,rep,name=yamlString,proto3" json:"yamlString,omitempty"`
ExternalInfoList []*ExternalInfo `protobuf:"bytes,2,rep,name=externalInfoList,proto3" json:"externalInfoList,omitempty"`
}
@ -313,9 +321,9 @@ func (*InfoListResp) Descriptor() ([]byte, []int) {
return file_pcmCore_proto_rawDescGZIP(), []int{4}
}
func (x *InfoListResp) GetYaml() []string {
func (x *InfoListResp) GetYamlString() []string {
if x != nil {
return x.Yaml
return x.YamlString
}
return nil
}
@ -410,49 +418,52 @@ var file_pcmCore_proto_rawDesc = []byte{
0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x45, 0x78, 0x74, 0x65,
0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x16, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74,
0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74,
0x22, 0xbe, 0x01, 0x0a, 0x12, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x45, 0x78, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49,
0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12,
0x24, 0x0a, 0x0d, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x65, 0x6c, 0x64,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x06,
0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12,
0x20, 0x0a, 0x0b, 0x72, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x07,
0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x72, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x54, 0x69, 0x6d,
0x65, 0x22, 0x34, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73,
0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x22, 0x43, 0x0a, 0x0b, 0x49, 0x6e, 0x66, 0x6f, 0x4c,
0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x65, 0x0a, 0x0c,
0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04,
0x79, 0x61, 0x6d, 0x6c, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x79, 0x61, 0x6d, 0x6c,
0x12, 0x41, 0x0a, 0x10, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f,
0x4c, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x70, 0x63, 0x6d,
0x43, 0x6f, 0x72, 0x65, 0x2e, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66,
0x6f, 0x52, 0x10, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x4c,
0x69, 0x73, 0x74, 0x22, 0x78, 0x0a, 0x0c, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49,
0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x03, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x65,
0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0d, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x65, 0x6c,
0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0x7b, 0x0a,
0x07, 0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x12, 0x37, 0x0a, 0x08, 0x53, 0x79, 0x6e, 0x63,
0x49, 0x6e, 0x66, 0x6f, 0x12, 0x14, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x53,
0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x1a, 0x15, 0x2e, 0x70, 0x63, 0x6d,
0x43, 0x6f, 0x72, 0x65, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73,
0x70, 0x12, 0x37, 0x0a, 0x08, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x14, 0x2e,
0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74,
0x52, 0x65, 0x71, 0x1a, 0x15, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x49, 0x6e,
0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x42, 0x0a, 0x5a, 0x08, 0x2f, 0x70,
0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x22, 0xe0, 0x01, 0x0a, 0x12, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x45, 0x78, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x20, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73,
0x6b, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49,
0x64, 0x12, 0x24, 0x0a, 0x0d, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x65,
0x6c, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e,
0x61, 0x6c, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65,
0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d,
0x65, 0x12, 0x20, 0x0a, 0x0b, 0x72, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x54, 0x69, 0x6d, 0x65,
0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x72, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x54,
0x69, 0x6d, 0x65, 0x22, 0x34, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52,
0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x22, 0x43, 0x0a, 0x0b, 0x49, 0x6e, 0x66,
0x6f, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x20, 0x0a, 0x0b,
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x71,
0x0a, 0x0c, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x1e,
0x0a, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x01, 0x20, 0x03,
0x28, 0x09, 0x52, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x41,
0x0a, 0x10, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69,
0x73, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f,
0x72, 0x65, 0x2e, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x52,
0x10, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73,
0x74, 0x22, 0x78, 0x0a, 0x0c, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x6e, 0x66,
0x6f, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x03, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x65, 0x78, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0d, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12,
0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0x7b, 0x0a, 0x07, 0x70,
0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x12, 0x37, 0x0a, 0x08, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e,
0x66, 0x6f, 0x12, 0x14, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x53, 0x79, 0x6e,
0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x1a, 0x15, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f,
0x72, 0x65, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x12,
0x37, 0x0a, 0x08, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x14, 0x2e, 0x70, 0x63,
0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65,
0x71, 0x1a, 0x15, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x49, 0x6e, 0x66, 0x6f,
0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x42, 0x0a, 0x5a, 0x08, 0x2f, 0x70, 0x63, 0x6d,
0x43, 0x6f, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.19.4
// - protoc v3.21.12
// source: pcmCore.proto
package pcmCore

View File

@ -16,4 +16,13 @@ OrgId: "313ae32df03bc116255e6808949fcf57"
Layout: "2006-01-02 15:04:05"
EndPoint: https://api01.hpccube.com:65106
Token:
ClusterID:
ClusterID:
# core rpc
PcmCoreRpcConf:
Etcd:
Hosts:
- 10.101.15.170:31890
Key: pcmCore.rpc
User: root
Pass: I9wLvrRufj

View File

@ -1,13 +1,18 @@
package main
import (
"flag"
"github.com/zeromicro/go-zero/core/logx"
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/config"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/server"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc"
"PCM/common/param"
"PCM/common/tool"
"context"
"flag"
"github.com/zeromicro/go-zero/core/logx"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
@ -26,7 +31,7 @@ func main() {
// start log component
logx.MustSetup(c.LogConf)
ctx := svc.NewServiceContext(c)
ctx.Cron.Start()
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
hpcAC.RegisterHpcACServer(grpcServer, server.NewHpcACServer(ctx))
@ -37,5 +42,62 @@ func main() {
defer s.Stop()
logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
initCron(ctx)
s.Start()
}
func initCron(svc *svc.ServiceContext) {
svc.Cron.AddFunc("*/5 * * * * ?", func() {
SyncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "hpc",
}
// 查询core端分发下来的任务列表
infoReq := pcmcoreclient.InfoListReq{
Kind: "hpc",
ServiceName: "ac",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
logx.Error(err)
return
}
// 提交任务
go func() {
submitJobLogic := logic.NewSubmitJobLogic(context.Background(), svc)
for _, yamlString := range infoList.YamlString {
bytes, err := json.Marshal(yamlString)
if err != nil {
return
}
var params param.HpcBase
tool.Convert(bytes, &params)
submitReq := hpcAC.SubmitJobReq{}
submitJobLogic.SubmitJob(&submitReq)
}
}()
// 查询运行中的任务列表
listLogic := logic.NewListJobLogic(context.Background(), svc)
listReq := hpcAC.ListJobReq{}
listJob, err := listLogic.ListJob(&listReq)
if err != nil {
return
}
for _, taskInfo := range infoList.ExternalInfoList {
for _, job := range listJob.Jobs {
if job.JobId == taskInfo.ExternalField {
external := pcmcoreclient.LatestExternalInfo{
ServiceName: "ac",
TaskId: taskInfo.TaskId,
ExternalField: taskInfo.ExternalField,
Name: taskInfo.Name,
StartTime: job.JobStartTime,
RunningTime: int64(tool.StringToInt(job.JobRunTime)),
}
SyncInfoReq.LatestExternalInfoList = append(SyncInfoReq.LatestExternalInfoList, &external)
}
}
}
// 同步信息到core端
svc.PcmCoreRpc.SyncInfo(context.Background(), &SyncInfoReq)
})
}

View File

@ -8,6 +8,6 @@ import (
type Config struct {
zrpc.RpcServerConf
ShuguangConf
LogConf logx.LogConf
PcmCoreRpcConf zrpc.RpcClientConf
LogConf logx.LogConf
}

View File

@ -1,15 +1,22 @@
package svc
import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/config"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/zrpc"
)
type ServiceContext struct {
Config config.Config
Config config.Config
Cron *cron.Cron
PcmCoreRpc pcmcoreclient.PcmCore
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
Config: c,
Cron: cron.New(cron.WithSeconds()),
PcmCoreRpc: pcmcoreclient.NewPcmCore(zrpc.MustNewClient(c.PcmCoreRpcConf)),
}
}

View File

@ -1,124 +0,0 @@
// KqMessage
package message
// Hpc-ac
type HpcSubmitMessage struct {
TaskId int64 `json:"taskId"`
SlurmVersion string `json:"slurmVersion"`
Apptype string `json:"apptype,optional"`
Appname string `json:"appname,optional"`
StrJobManagerID int64 `json:"strJobManagerID,optional"`
MapAppJobInfo MapAppJobInfo `json:"mapAppJobInfo,optional"`
Account string `json:"account,optional"` //
Acctg_freq string `json:"acctg_freq,optional"`
Alloc_node string `json:"alloc_node,optional"`
Alloc_resp_port int32 `json:"alloc_resp_port,optional"`
Alloc_sid int32 `json:"alloc_sid,optional"`
Argc int32 `json:"argc,optional"`
Argv []Argv `json:"Argv,optional"`
Array_inx string `json:"array_inx,optional"`
Begin_time int64 `json:"begin_time,optional"`
Ckpt_interval int32 `json:"ckpt_interval,optional"`
Ckpt_dir string `json:"ckpt_dir,optional"`
Comment string `json:"comment,optional"`
Contiguous int32 `json:"contiguous,optional"`
Cpu_bind string `json:"cpu_bind,optional"`
Cpu_bind_type int32 `json:"cpu_bind_type,optional"`
Dependency string `json:"dependency,optional"`
End_time int64 `json:"end_time,optional"`
Environment []Environment `json:"Environment,optional"`
Env_size int32 `json:"env_size,optional"`
Exc_nodes string `json:"exc_nodes,optional"`
Features string `json:"features,optional"`
Gres string `json:"gres,optional"`
Group_id int32 `json:"group_id,optional"`
Immediate int32 `json:"immediate,optional"`
Job_id int32 `json:"job_id,optional"`
Kill_on_node_fail int32 `json:"kill_on_node_fail,optional"`
Licenses string `json:"licenses,optional"`
Mail_type int32 `json:"mail_type,optional"`
Mail_user string `json:"mail_user,optional"`
Mem_bind string `json:"mem_bind,optional"`
Mem_bind_type int32 `json:"mem_bind_type,optional"`
Name string `json:"name,optional"` //
Network string `json:"network,optional"`
Nice int32 `json:"nice,optional"`
Num_tasks int32 `json:"num_tasks,optional"`
Open_mode int32 `json:"open_mode,optional"`
Other_port int32 `json:"other_port,optional"`
Overcommit int32 `json:"overcommit,optional"`
Partition string `json:"partition,optional"`
Plane_size int32 `json:"plane_size,optional"`
Priority int32 `json:"priority,optional"`
Profile int32 `json:"profile,optional"`
Qos string `json:"qos,optional"`
Resp_host string `json:"resp_host,optional"`
Req_nodes string `json:"req_nodes,optional"`
Requeue int32 `json:"requeue,optional"`
Reservation string `json:"reservation,optional"`
Script string `json:"script,optional"` //
Shared int32 `json:"shared,optional"`
Spank_job_env_size int32 `json:"spank_job_env_size,optional"`
Task_dist int32 `json:"task_dist,optional"`
Time_limit int32 `json:"time_limit,optional"`
Time_min int32 `json:"time_min,optional"`
User_id int32 `json:"user_id,optional"` //
Wait_all_nodes int32 `json:"wait_all_nodes,optional"`
Warn_signal int32 `json:"warn_signal,optional"`
Warn_time int32 `json:"warn_time,optional"`
Work_dir string `json:"work_dir,optional"`
Cpus_per_task int32 `json:"cpus_per_task,optional"`
Min_cpus int32 `json:"min_cpus,optional"` //
Max_cpus int32 `json:"max_cpus,optional"`
Min_nodes int32 `json:"min_nodes,optional"`
Max_nodes int32 `json:"max_nodes,optional"`
Boards_per_node int32 `json:"boards_per_node,optional"`
Sockets_per_board int32 `json:"sockets_per_board,optional"`
Sockets_per_node int32 `json:"sockets_per_node,optional"`
Cores_per_socket int32 `json:"cores_per_socket,optional"`
Threads_per_core int32 `json:"threads_per_core,optional"`
Ntasks_per_node int32 `json:"ntasks_per_node,optional"`
Ntasks_per_socket int32 `json:"ntasks_per_socket,optional"`
Ntasks_per_core int32 `json:"ntasks_per_core,optional"`
Ntasks_per_board int32 `json:"ntasks_per_board,optional"`
Pn_min_cpus int32 `json:"pn_min_cpus,optional"`
Pn_min_memory int32 `json:"pn_min_memory,optional"`
Pn_min_tmp_disk int32 `json:"pn_min_tmp_disk,optional"`
Reboot int32 `json:"reboot,optional"`
Rotate int32 `json:"rotate,optional"`
Req_switch int32 `json:"req_switch,optional"`
Std_err string `json:"std_err,optional"`
Std_in string `json:"std_in,optional"`
Std_out string `json:"std_out,optional"`
Wait4switch int32 `json:"wait4switch,optional"`
Wckey string `json:"wckey,optional"`
}
type Argv struct {
Argv string `json:"argv,optional"`
}
type Environment struct {
Environment string `json:"environment,optional"`
}
type MapAppJobInfo struct {
GAP_CMD_FILE string `json:"GAP_CMD_FILE"` //命令行内容
GAP_NNODE string `json:"GAP_NNODE"` //节点个数当指定该参数时GAP_NODE_STRING必须为""
GAP_NODE_STRING string `json:"GAP_NODE_STRING,optional"` //指定节点当指定该参数时GAP_NNODE必须为""
GAP_SUBMIT_TYPE string `json:"GAP_SUBMIT_TYPE"` //cmd命令行模式
GAP_JOB_NAME string `json:"GAP_JOB_NAME"` //作业名称
GAP_WORK_DIR string `json:"GAP_WORK_DIR"` //工作路径
GAP_QUEUE string `json:"GAP_QUEUE"` //队列名称
GAP_NPROC string `json:"GAP_NPROC,optional"` //总核心数GAP_NPROC和GAP_PPN选其一填写
GAP_PPN string `json:"GAP_PPN,optional"` //CPU核心/节点GAP_NPROC和GAP_PPN选其一填写
GAP_NGPU string `json:"GAP_NGPU,optional"` //GPU卡数/节点
GAP_NDCU string `json:"GAP_NDCU,optional"` //DCU卡数/节点
GAP_JOB_MEM string `json:"GAP_JOB_MEM,optional"` //每个节点内存值单位为MB/GB
GAP_WALL_TIME string `json:"GAP_WALL_TIME"` //最大运行时长HH:MM:ss
GAP_EXCLUSIVE string `json:"GAP_EXCLUSIVE,optional"` // 是否独占节点1为独占空为非独占
GAP_APPNAME string `json:"GAP_APPNAME"` //BASE基础应用支持填写具体的应用英文名称
GAP_MULTI_SUB string `json:"GAP_MULTI_SUB,optional"` //作业组长度建议为小于等于50的正整数
GAP_STD_OUT_FILE string `json:"GAP_STD_OUT_FILE"` //工作路径/std.out.%j
GAP_STD_ERR_FILE string `json:"GAP_STD_ERR_FILE"` //工作路径/std.err.%j
}

7
common/param/hpcBase.go Normal file
View File

@ -0,0 +1,7 @@
package param
type HpcBase struct {
JobId string `json:"jobId"`
Name string `json:"name"`
WorkDir string `json:"workDir"`
}

View File

@ -39,3 +39,11 @@ func TimeStringRemoveZone(tm string) string {
timeZone, _ := time.Parse("2006-01-02 15:04:05 -0700 MST", tm)
return timeZone.Format("2006-01-02 15:04:05")
}
func TimeRemoveZone(tm time.Time) time.Time {
parse, err := time.Parse("2006-01-02 15:04:05", tm.Format("2006-01-02 15:04:05"))
if err != nil {
return time.Time{}
}
return parse
}

View File

@ -1,23 +0,0 @@
package uniqueid
import (
"github.com/sony/sonyflake"
"github.com/zeromicro/go-zero/core/logx"
)
var flake *sonyflake.Sonyflake
func init() {
flake = sonyflake.NewSonyflake(sonyflake.Settings{})
}
func GenId() int64 {
id, err := flake.NextID()
if err != nil {
logx.Severef("flake NextID failed with %s \n", err)
panic(err)
}
return int64(id)
}