文件上传

This commit is contained in:
zhangwei 2023-07-04 11:47:24 +08:00
parent 088cc41d99
commit 72f094fbec
15 changed files with 221 additions and 159 deletions

View File

@ -191,11 +191,11 @@ service pcm {
group : image
)
service pcm {
@handler uploadImageHandler
post /image/upload () returns ()
@handler uploadHandler
post /upload () returns ()
@handler chunkImageHandler
post /image/chunk () returns ()
@handler chunkHandler
post /chunk () returns ()
@handler imageListHandler
get /image/list () returns (imageListResp)

View File

@ -34,4 +34,9 @@ type Config struct {
OctopusRpcConf zrpc.RpcClientConf
NexusUrl string
JccScheduleUrl string
MinioConf struct {
Secret string
AccessKey string
Endpoint string
}
}

View File

@ -0,0 +1,162 @@
package image
import (
"PCM/adaptor/PCM-CORE/model"
result2 "PCM/common/result"
"bufio"
"context"
"encoding/base64"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
types2 "github.com/docker/docker/api/types"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/json"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"PCM/adaptor/PCM-CORE/api/internal/svc"
)
var dir, _ = os.Getwd()
var uploadPath = filepath.Join(dir, "uploads")
func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
size, _ := strconv.ParseInt(r.PostFormValue("size"), 10, 64)
hash := r.PostFormValue("hash")
name := r.PostFormValue("name")
dataType := r.PostFormValue("dataType")
// 对比合并请求的文件大小和已上传文件夹大小
toSize, _ := getDirSize(filepath.Join(uploadTempPath, hash))
if size != toSize {
fmt.Fprintf(w, "文件上传错误")
}
chunksPath := filepath.Join(uploadTempPath, hash)
files, _ := ioutil.ReadDir(chunksPath)
// 将文件根据索引序号排序
filesSort := make(map[string]string)
for _, f := range files {
nameArr := strings.Split(f.Name(), "-")
filesSort[nameArr[1]] = f.Name()
}
saveFile := filepath.Join(uploadPath, name)
if exists, _ := PathExists(saveFile); exists {
os.Remove(saveFile)
}
fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm)
var wg sync.WaitGroup
filesCount := len(files)
if filesCount != len(filesSort) {
fmt.Fprintf(w, "文件上传错误2")
}
wg.Add(filesCount)
for i := 0; i < filesCount; i++ {
// 这里一定要注意按顺序读取不然文件就会损坏
fileName := filepath.Join(chunksPath, filesSort[strconv.Itoa(i)])
data, err := ioutil.ReadFile(fileName)
fmt.Println(err)
fs.Write(data)
wg.Done()
}
wg.Wait()
os.RemoveAll(chunksPath)
defer fs.Close()
// 保存到数据库表里
svcCtx.DbEngin.Create(&model.File{
Name: name,
Hash: hash,
Type: dataType,
Status: "0",
Bucket: "pcm"})
// 根据数据类型按需上传镜像推送到nexus 数据集和算法推送到云际存储)
//switch dataType {
//case "image":
// pushImage(svcCtx, name)
//case "dataSet", "algorithm":
// uploadStorage(svcCtx, name)
//}
err := pushImage(svcCtx, name)
if err != nil {
return
}
result2.HttpResult(r, w, nil, err)
}
}
func uploadStorage(svcCtx *svc.ServiceContext, name string) error {
svcCtx.Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("pcm"),
Key: aws.String(name),
})
return nil
}
func pushImage(svcCtx *svc.ServiceContext, name string) error {
// 加载镜像文件到docker
fileInfo, err := os.Open(filepath.Join(uploadPath, name))
reader := bufio.NewReader(fileInfo)
if err != nil {
return err
}
body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false)
if err != nil {
return err
}
bytes, err := ioutil.ReadAll(body.Body)
loadBody := LoadBody{}
err = json.Unmarshal(bytes, &loadBody)
if err != nil {
return err
}
imageName := strings.TrimSpace(loadBody.Stream[13:])
privateImageName := "hub.jcce.dev:18445/" + imageName
// 给镜像打上私有仓库的tag
err = svcCtx.DockerClient.ImageTag(context.Background(), imageName, privateImageName)
if err != nil {
return err
}
// 推送镜像到registry
authConfig := types2.AuthConfig{
Username: "admin",
Password: "Nudt@123",
}
authConfigBytes, err := json.Marshal(authConfig)
authStr := base64.URLEncoding.EncodeToString(authConfigBytes)
_, err = svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr})
if err != nil {
return err
}
println("传输完成!")
// 删除本地镜像 避免存储资源浪费
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{})
if err != nil {
return err
}
// 删除本地文件 避免占用本地存储资源
err = os.Remove(filepath.Join(uploadPath, name))
if err != nil {
return err
}
return nil
}
// DirSize 获取整体文件夹大小
func getDirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}

View File

@ -1,127 +0,0 @@
package image
import (
result2 "PCM/common/result"
"bufio"
"context"
"encoding/base64"
"fmt"
types2 "github.com/docker/docker/api/types"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/rest/httpx"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/json"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"PCM/adaptor/PCM-CORE/api/internal/svc"
)
var dir, _ = os.Getwd()
var uploadPath = filepath.Join(dir, "uploads")
func ChunkImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
size, _ := strconv.ParseInt(r.PostFormValue("size"), 10, 64)
hash := r.PostFormValue("hash")
name := r.PostFormValue("name")
// 对比合并请求的文件大小和已上传文件夹大小
toSize, _ := getDirSize(filepath.Join(uploadTempPath, hash))
if size != toSize {
fmt.Fprintf(w, "文件上传错误")
}
chunksPath := filepath.Join(uploadTempPath, hash)
files, _ := ioutil.ReadDir(chunksPath)
// 将文件根据索引序号排序
filesSort := make(map[string]string)
for _, f := range files {
nameArr := strings.Split(f.Name(), "-")
filesSort[nameArr[1]] = f.Name()
}
saveFile := filepath.Join(uploadPath, name)
if exists, _ := PathExists(saveFile); exists {
os.Remove(saveFile)
}
fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm)
var wg sync.WaitGroup
filesCount := len(files)
if filesCount != len(filesSort) {
fmt.Fprintf(w, "文件上传错误2")
}
wg.Add(filesCount)
for i := 0; i < filesCount; i++ {
// 这里一定要注意按顺序读取不然文件就会损坏
fileName := filepath.Join(chunksPath, filesSort[strconv.Itoa(i)])
data, err := ioutil.ReadFile(fileName)
fmt.Println(err)
fs.Write(data)
wg.Done()
}
wg.Wait()
os.RemoveAll(chunksPath)
defer fs.Close()
// 加载镜像文件到docker
fileInfo, err := os.Open(filepath.Join(uploadPath, name))
reader := bufio.NewReader(fileInfo)
if err != nil {
return
}
body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
bytes, err := ioutil.ReadAll(body.Body)
loadBody := LoadBody{}
err = json.Unmarshal(bytes, &loadBody)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
imageName := strings.TrimSpace(loadBody.Stream[13:])
privateImageName := "hub.jcce.dev:18445/repository/pcm/" + imageName
// 给镜像打上私有仓库的tag
err = svcCtx.DockerClient.ImageTag(context.Background(), imageName, privateImageName)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
// 推送镜像到registry
authConfig := types2.AuthConfig{
Username: "admin",
Password: "Nudt@123",
}
authConfigBytes, err := json.Marshal(authConfig)
authStr := base64.URLEncoding.EncodeToString(authConfigBytes)
_, err = svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr})
if err != nil {
logx.Error(err.Error())
return
}
println("传输完成!")
// 删除本地镜像 避免存储资源浪费
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{})
if err != nil {
return
}
result2.HttpResult(r, w, nil, err)
}
}
// DirSize 获取整体文件夹大小
func getDirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}

View File

@ -20,16 +20,17 @@ type LoadBody struct {
var uploadTempPath = filepath.Join(uploadPath, "temp")
func UploadImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
func UploadHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
file, fileHeader, err := r.FormFile("file")
index := r.PostFormValue("index")
hash := r.PostFormValue("hash")
// 合并路径
chunksPath := filepath.Join(uploadTempPath, hash)
// 文件路径
filePath := filepath.Join(chunksPath, hash+"-"+index)
// 检查临时文件夹是否存在
// 检查临时文件夹是否存在 不存在则创建文件夹
isPathExists, err := PathExists(chunksPath)
if !isPathExists {
err = os.MkdirAll(chunksPath, os.ModePerm)
@ -81,7 +82,6 @@ func UploadImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
defer file.Close()
defer destFile.Close()
}
result2.HttpResult(r, w, nil, err)
}
}

View File

@ -283,13 +283,13 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
[]rest.Route{
{
Method: http.MethodPost,
Path: "/image/upload",
Handler: image.UploadImageHandler(serverCtx),
Path: "/upload",
Handler: image.UploadHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/image/chunk",
Handler: image.ChunkImageHandler(serverCtx),
Path: "/chunk",
Handler: image.ChunkHandler(serverCtx),
},
{
Method: http.MethodGet,
@ -308,5 +308,6 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
},
rest.WithPrefix("/pcm/v1"),
rest.WithMaxBytes(1111111111),
)
}

View File

@ -7,21 +7,21 @@ import (
"github.com/zeromicro/go-zero/core/logx"
)
type ChunkImageLogic struct {
type ChunkLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewChunkImageLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ChunkImageLogic {
return &ChunkImageLogic{
func NewChunkLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ChunkLogic {
return &ChunkLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ChunkImageLogic) ChunkImage() error {
func (l *ChunkLogic) Chunk() error {
// todo: add your logic here and delete this line
return nil

View File

@ -26,7 +26,7 @@ func NewDataSetCheckLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data
func (l *DataSetCheckLogic) DataSetCheck(req *types.CheckReq) (resp *types.CheckResp, err error) {
resp = &types.CheckResp{}
var dataSets []model.DataSet
var dataSets []model.File
l.svcCtx.DbEngin.Find(&dataSets).Where("md5", req.FileMd5)
if len(dataSets) != 0 {
resp.Exist = true

View File

@ -1,9 +1,8 @@
package image
import (
"context"
"PCM/adaptor/PCM-CORE/api/internal/svc"
"context"
"github.com/zeromicro/go-zero/core/logx"
)
@ -22,7 +21,6 @@ func NewUploadDataSetLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Upl
}
func (l *UploadDataSetLogic) UploadDataSet() error {
// todo: add your logic here and delete this line
return nil
}

View File

@ -1,27 +1,28 @@
package image
import (
"PCM/adaptor/PCM-CORE/api/internal/svc"
"PCM/adaptor/PCM-CORE/api/internal/types"
"context"
"PCM/adaptor/PCM-CORE/api/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type UploadImageLogic struct {
type UploadLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewUploadImageLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UploadImageLogic {
return &UploadImageLogic{
func NewUploadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UploadLogic {
return &UploadLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *UploadImageLogic) UploadImage(req *types.UploadImageReq) error {
func (l *UploadLogic) Upload() error {
// todo: add your logic here and delete this line
return nil
}

View File

@ -7,6 +7,10 @@ import (
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcacclient"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcthclient"
"PCM/adaptor/PCM-STORAGE/PCM-CEPH/rpc/cephclient"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/docker/docker/client"
"github.com/go-redis/redis/v8"
"github.com/robfig/cron/v3"
@ -32,9 +36,21 @@ type ServiceContext struct {
OctopusRpc octopusclient.Octopus
CephRpc cephclient.Ceph
DockerClient *client.Client
Downloader *s3manager.Downloader
Uploader *s3manager.Uploader
}
func NewServiceContext(c config.Config) *ServiceContext {
// 创建s3 session
session, _ := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(c.MinioConf.AccessKey, c.MinioConf.Secret, ""), //使用静态凭据,硬编码
Endpoint: aws.String(c.MinioConf.Endpoint), //配置端点
Region: aws.String("default"), //配置区域
DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS
S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
})
downloader := s3manager.NewDownloader(session)
uploader := s3manager.NewUploader(session)
//启动Gorm支持
dbEngin, _ := gorm.Open(mysql.Open(c.DB.DataSource), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
@ -62,5 +78,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,
}
}

View File

@ -190,7 +190,7 @@ type JobTotalResp struct {
type TrainJob struct {
Name string `json:"name"`
Status string `json:"status"`
ServiceName string `json:"ServiceName"`
ServiceName string `json:"serviceName"`
SynergyStatus string `json:"synergyStatus"`
Strategy int `json:"strategy"`
}

View File

@ -12,14 +12,15 @@ import "time"
// assembly: hongmouer.his.models.go
// class:HongMouer.HIS.Models.DataSet
// version:2023-05-06 09:58
type DataSet struct {
Id *int `gorm:"column:primaryKey;id" json:"Id"` //type:*int comment:id version:2023-05-06 09:58
type File struct {
Id *int `gorm:"column:id" json:"Id"` //type:*int comment:id version:2023-05-06 09:58
Name string `gorm:"column:name" json:"Name"` //type:string comment:文件名称 version:2023-05-06 09:58
Md5 string `gorm:"column:md5" json:"Md5"` //type:string comment:md5 version:2023-05-06 09:58
Type string `gorm:"column:type" json:"Type"` //type:string comment: version:2023-05-06 09:58
Suffix string `gorm:"column:suffix" json:"Suffix"` //type:string comment:后缀名 version:2023-05-06 09:58
Bucket string `gorm:"column:bucket" json:"Bucket"` //type:string comment:桶 version:2023-05-06 09:58
Size *int `gorm:"column:size" json:"Size"` //type:*int comment:大小 version:2023-05-06 09:58
Hash string `gorm:"column:hash" json:"Hash"` //type:string comment:hash version:2023-05-06 09:58
Status string `gorm:"column:status" json:"Status"` //type:string comment:hash version:2023-05-06 09:58
Size int64 `gorm:"column:size" json:"Size"` //type:*int comment:大小 version:2023-05-06 09:58
DeletedFlag *int `gorm:"column:deleted_flag" json:"DeletedFlag"` //type:*int comment:是否删除 version:2023-05-06 09:58
CreatedBy *int `gorm:"column:created_by" json:"CreatedBy"` //type:*int comment:创建人 version:2023-05-06 09:58
CreatedTime *time.Time `gorm:"column:created_time" json:"CreatedTime"` //type:*time.Time comment:创建时间 version:2023-05-06 09:58
@ -30,5 +31,5 @@ type DataSet struct {
// TableName 表名:data_set
// 说明:
func TableName() string {
return "data_set"
return "t_file"
}

3
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/JCCE-nudt/zero-contrib/zrpc/registry/nacos v0.0.0-20230419021610-13bbc83fbc3c
github.com/Masterminds/squirrel v1.5.4
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1704
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go v1.44.294
github.com/aws/aws-sdk-go-v2/config v1.18.25
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.67
github.com/aws/aws-sdk-go-v2/service/s3 v1.33.1
@ -38,6 +38,7 @@ require (
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect

2
go.sum
View File

@ -433,6 +433,8 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.1704/go.mod h1:RcDobYh8k5VP6TNybz9m
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.44.294 h1:3x7GaEth+pDU9HwFcAU0awZlEix5CEdyIZvV08SlHa8=
github.com/aws/aws-sdk-go v1.44.294/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs=