Support dataMigrate (#2592)

* init dataMigrate

Signed-off-by: zwwhdls <zww@hdls.me>

* fix static check

Signed-off-by: zwwhdls <zww@hdls.me>

* init juicefs data migrate

Signed-off-by: zwwhdls <zww@hdls.me>

* fix static check

Signed-off-by: zwwhdls <zww@hdls.me>

* init juicefs datamigrate chart

Signed-off-by: zwwhdls <zww@hdls.me>

* update crd yaml

Signed-off-by: zwwhdls <zww@hdls.me>

* finish juicefs sync

Signed-off-by: zwwhdls <zww@hdls.me>

* update crd

Signed-off-by: zwwhdls <zww@hdls.me>

* fix chart version

Signed-off-by: zwwhdls <zww@hdls.me>

* fix unit test

Signed-off-by: zwwhdls <zww@hdls.me>

* add unit test

Signed-off-by: zwwhdls <zww@hdls.me>

* support juicefs enterprise

Signed-off-by: zwwhdls <zww@hdls.me>

* add block in datamigrate type

Signed-off-by: zwwhdls <zww@hdls.me>

* add runtimeType

Signed-off-by: zwwhdls <zww@hdls.me>

* update openapi

Signed-off-by: zwwhdls <zww@hdls.me>

* add e2e test for datamigrate

Signed-off-by: zwwhdls <zww@hdls.me>

* fix del handle & fix e2e test & update pending handle

Signed-off-by: zwwhdls <zww@hdls.me>

* return error when runtime not implement migrate

Signed-off-by: zwwhdls <zww@hdls.me>

* add event in datamigrate when dataset not found

Signed-off-by: zwwhdls <zww@hdls.me>

---------

Signed-off-by: zwwhdls <zww@hdls.me>
This commit is contained in:
Weiwei 2023-03-01 10:43:41 +08:00 committed by GitHub
parent c39b817d53
commit 2c134c8e21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 3909 additions and 64 deletions

10
PROJECT
View File

@ -78,10 +78,18 @@ resources:
version: v1alpha1
- api:
crdVersion: v1
namespaced: false
domain: fluid.io
group: data
kind: ThinRuntimeProfile
path: github.com/fluid-cloudnative/fluid/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
namespaced: true
controller: true
domain: fluid.io
group: data
kind: DataMigrate
path: github.com/fluid-cloudnative/fluid/api/v1alpha1
version: v1alpha1
version: "3"

View File

@ -0,0 +1,132 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// DataMigrateSpec defines the desired state of DataMigrate
type DataMigrateSpec struct {
// The version information that instructs fluid to orchestrate a particular version for data migrate.
// +optional
VersionSpec `json:",inline,omitempty"`
// data to migrate source, including dataset and external storage
From DataToMigrate `json:"from"`
// data to migrate destination, including dataset and external storage
To DataToMigrate `json:"to"`
// if dataMigrate blocked dataset usage, default is false
// +optional
Block bool `json:"block,omitempty"`
// using which runtime to migrate data; if none, take dataset runtime as default
// +optional
RuntimeType string `json:"runtimeType,omitempty"`
// options for migrate, different for each runtime
// +optional
Options map[string]string `json:"options,omitempty"`
// policy for migrate, including None, Once, Cron, OnEvent
// +optional
Policy Policy `json:"policy,omitempty"`
// The schedule in Cron format, only set when policy is cron, see https://en.wikipedia.org/wiki/Cron.
// +optional
Schedule string `json:"schedule,omitempty"`
// PodMetadata defines labels and annotations that will be propagated to DataLoad pods
PodMetadata PodMetadata `json:"podMetadata,omitempty"`
}
type Policy string
const (
// None default policy is Once
None Policy = "None"
// Once run data migrate once
Once Policy = "Once"
// Cron run data migrate by cron
Cron Policy = "Cron"
// OnEvent run data migrate when event occurs
OnEvent Policy = "OnEvent"
)
type DataToMigrate struct {
// dataset to migrate
DataSet *DatasetToMigrate `json:"dataset,omitempty"`
// external storage for data migrate
ExternalStorage *ExternalStorage `json:"externalStorage,omitempty"`
}
type DatasetToMigrate struct {
// name of dataset
Name string `json:"name"`
// namespace of dataset
Namespace string `json:"namespace"`
// path to migrate
Path string `json:"path,omitempty"`
}
type ExternalStorage struct {
// type of external storage, including s3, oss, gcs, ceph, nfs, pvc, etc. (related to runtime)
URI string `json:"uri"`
// encrypt info for external storage
// +optional
EncryptOptions []EncryptOption `json:"encryptOptions,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=`.metadata.creationTimestamp`
// +kubebuilder:printcolumn:name="Duration",type="string",JSONPath=`.status.duration`
// DataMigrate is the Schema for the datamigrates API
type DataMigrate struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DataMigrateSpec `json:"spec,omitempty"`
Status OperationStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// DataMigrateList contains a list of DataMigrate
type DataMigrateList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []DataMigrate `json:"items"`
}
func init() {
SchemeBuilder.Register(&DataMigrate{}, &DataMigrateList{})
}

View File

@ -17,10 +17,11 @@ limitations under the License.
package v1alpha1
import (
// "github.com/rook/rook/pkg/apis/rook.io/v1"
"github.com/fluid-cloudnative/fluid/pkg/common"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "github.com/rook/rook/pkg/apis/rook.io/v1"
"github.com/fluid-cloudnative/fluid/pkg/common"
)
// DatasetPhase indicates whether the loading is behaving
@ -37,6 +38,8 @@ const (
NotBoundDatasetPhase DatasetPhase = "NotBound"
// updating dataset, can't be released
UpdatingDatasetPhase DatasetPhase = "Updating"
// migrating dataset, can't be mounted
DataMigrating DatasetPhase = "DataMigrating"
// the dataset have no phase and need to be judged
NoneDatasetPhase DatasetPhase = ""
)
@ -210,6 +213,10 @@ type DatasetStatus struct {
// This is mainly used as a lock to prevent concurrent DataLoad jobs.
DataLoadRef string `json:"dataLoadRef,omitempty"`
// DataMigrateRef specifies the running DataMigrate job that targets this Dataset.
// This is mainly used as a lock to prevent concurrent DataMigrate jobs.
DataMigrateRef string `json:"dataMigrateRef,omitempty"`
// DataBackupRef specifies the running Backup job that targets this Dataset.
// This is mainly used as a lock to prevent concurrent DataBackup jobs.
DataBackupRef string `json:"dataBackupRef,omitempty"`

View File

@ -44,12 +44,17 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataLoad": schema_fluid_cloudnative_fluid_api_v1alpha1_DataLoad(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataLoadList": schema_fluid_cloudnative_fluid_api_v1alpha1_DataLoadList(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataLoadSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_DataLoadSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrate": schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrate(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrateList": schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrateList(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrateSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrateSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataRestoreLocation": schema_fluid_cloudnative_fluid_api_v1alpha1_DataRestoreLocation(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataToMigrate": schema_fluid_cloudnative_fluid_api_v1alpha1_DataToMigrate(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.Dataset": schema_fluid_cloudnative_fluid_api_v1alpha1_Dataset(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetCondition": schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetCondition(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetList": schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetList(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetStatus(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetToMigrate": schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetToMigrate(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EACCompTemplateSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_EACCompTemplateSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EACFuseSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_EACFuseSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EACRuntime": schema_fluid_cloudnative_fluid_api_v1alpha1_EACRuntime(ref),
@ -57,6 +62,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EACRuntimeSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_EACRuntimeSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOption": schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOption(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOptionSource": schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOptionSource(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.ExternalStorage": schema_fluid_cloudnative_fluid_api_v1alpha1_ExternalStorage(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.GooseFSCompTemplateSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_GooseFSCompTemplateSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.GooseFSFuseSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_GooseFSFuseSpec(ref),
"github.com/fluid-cloudnative/fluid/api/v1alpha1.GooseFSRuntime": schema_fluid_cloudnative_fluid_api_v1alpha1_GooseFSRuntime(ref),
@ -1127,6 +1133,204 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataLoadSpec(ref common.Referen
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrate(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Description: "DataMigrate is the Schema for the datamigrates API",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"kind": {
SchemaProps: spec.SchemaProps{
Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
Type: []string{"string"},
Format: "",
},
},
"apiVersion": {
SchemaProps: spec.SchemaProps{
Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources",
Type: []string{"string"},
Format: "",
},
},
"metadata": {
SchemaProps: spec.SchemaProps{
Default: map[string]interface{}{},
Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"),
},
},
"spec": {
SchemaProps: spec.SchemaProps{
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrateSpec"),
},
},
"status": {
SchemaProps: spec.SchemaProps{
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationStatus"),
},
},
},
},
},
Dependencies: []string{
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrateSpec", "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrateList(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Description: "DataMigrateList contains a list of DataMigrate",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"kind": {
SchemaProps: spec.SchemaProps{
Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
Type: []string{"string"},
Format: "",
},
},
"apiVersion": {
SchemaProps: spec.SchemaProps{
Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources",
Type: []string{"string"},
Format: "",
},
},
"metadata": {
SchemaProps: spec.SchemaProps{
Default: map[string]interface{}{},
Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"),
},
},
"items": {
SchemaProps: spec.SchemaProps{
Type: []string{"array"},
Items: &spec.SchemaOrArray{
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrate"),
},
},
},
},
},
},
Required: []string{"items"},
},
},
Dependencies: []string{
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataMigrate", "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"},
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Description: "DataMigrateSpec defines the desired state of DataMigrate",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"image": {
SchemaProps: spec.SchemaProps{
Description: "Image (e.g. alluxio/alluxio)",
Type: []string{"string"},
Format: "",
},
},
"imageTag": {
SchemaProps: spec.SchemaProps{
Description: "Image tag (e.g. 2.3.0-SNAPSHOT)",
Type: []string{"string"},
Format: "",
},
},
"imagePullPolicy": {
SchemaProps: spec.SchemaProps{
Description: "One of the three policies: `Always`, `IfNotPresent`, `Never`",
Type: []string{"string"},
Format: "",
},
},
"from": {
SchemaProps: spec.SchemaProps{
Description: "data to migrate source, including dataset and external storage",
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.DataToMigrate"),
},
},
"to": {
SchemaProps: spec.SchemaProps{
Description: "data to migrate destination, including dataset and external storage",
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.DataToMigrate"),
},
},
"block": {
SchemaProps: spec.SchemaProps{
Description: "if dataMigrate blocked dataset usage, default is false",
Type: []string{"boolean"},
Format: "",
},
},
"runtimeType": {
SchemaProps: spec.SchemaProps{
Description: "using which runtime to migrate data; if none, take dataset runtime as default",
Type: []string{"string"},
Format: "",
},
},
"options": {
SchemaProps: spec.SchemaProps{
Description: "options for migrate, different for each runtime",
Type: []string{"object"},
AdditionalProperties: &spec.SchemaOrBool{
Allows: true,
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Default: "",
Type: []string{"string"},
Format: "",
},
},
},
},
},
"policy": {
SchemaProps: spec.SchemaProps{
Description: "policy for migrate, including None, Once, Cron, OnEvent",
Type: []string{"string"},
Format: "",
},
},
"schedule": {
SchemaProps: spec.SchemaProps{
Description: "The schedule in Cron format, only set when policy is cron, see https://en.wikipedia.org/wiki/Cron.",
Type: []string{"string"},
Format: "",
},
},
"podMetadata": {
SchemaProps: spec.SchemaProps{
Description: "PodMetadata defines labels and annotations that will be propagated to DataLoad pods",
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata"),
},
},
},
Required: []string{"from", "to"},
},
},
Dependencies: []string{
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DataToMigrate", "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata"},
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_DataRestoreLocation(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
@ -1154,6 +1358,32 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataRestoreLocation(ref common.
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_DataToMigrate(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"object"},
Properties: map[string]spec.Schema{
"dataset": {
SchemaProps: spec.SchemaProps{
Description: "dataset to migrate",
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetToMigrate"),
},
},
"externalStorage": {
SchemaProps: spec.SchemaProps{
Description: "external storage for data migrate",
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.ExternalStorage"),
},
},
},
},
},
Dependencies: []string{
"github.com/fluid-cloudnative/fluid/api/v1alpha1.DatasetToMigrate", "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExternalStorage"},
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_Dataset(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
@ -1536,6 +1766,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetStatus(ref common.Refere
Format: "",
},
},
"dataMigrateRef": {
SchemaProps: spec.SchemaProps{
Description: "DataMigrateRef specifies the running DataMigrate job that targets this Dataset. This is mainly used as a lock to prevent concurrent DataMigrate jobs.",
Type: []string{"string"},
Format: "",
},
},
"dataBackupRef": {
SchemaProps: spec.SchemaProps{
Description: "DataBackupRef specifies the running Backup job that targets this Dataset. This is mainly used as a lock to prevent concurrent DataBackup jobs.",
@ -1567,6 +1804,42 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetStatus(ref common.Refere
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetToMigrate(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"object"},
Properties: map[string]spec.Schema{
"name": {
SchemaProps: spec.SchemaProps{
Description: "name of dataset",
Default: "",
Type: []string{"string"},
Format: "",
},
},
"namespace": {
SchemaProps: spec.SchemaProps{
Description: "namespace of dataset",
Default: "",
Type: []string{"string"},
Format: "",
},
},
"path": {
SchemaProps: spec.SchemaProps{
Description: "path to migrate",
Type: []string{"string"},
Format: "",
},
},
},
Required: []string{"name", "namespace"},
},
},
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_EACCompTemplateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
@ -1977,6 +2250,43 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOptionSource(ref common.
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_ExternalStorage(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"object"},
Properties: map[string]spec.Schema{
"uri": {
SchemaProps: spec.SchemaProps{
Description: "type of external storage, including s3, oss, gcs, ceph, nfs, pvc, etc. (related to runtime)",
Default: "",
Type: []string{"string"},
Format: "",
},
},
"encryptOptions": {
SchemaProps: spec.SchemaProps{
Description: "encrypt info for external storage",
Type: []string{"array"},
Items: &spec.SchemaOrArray{
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Default: map[string]interface{}{},
Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOption"),
},
},
},
},
},
},
Required: []string{"uri"},
},
},
Dependencies: []string{
"github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOption"},
}
}
func schema_fluid_cloudnative_fluid_api_v1alpha1_GooseFSCompTemplateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{

View File

@ -507,6 +507,91 @@ func (in *DataLoadSpec) DeepCopy() *DataLoadSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataMigrate) DeepCopyInto(out *DataMigrate) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataMigrate.
func (in *DataMigrate) DeepCopy() *DataMigrate {
if in == nil {
return nil
}
out := new(DataMigrate)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *DataMigrate) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataMigrateList) DeepCopyInto(out *DataMigrateList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]DataMigrate, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataMigrateList.
func (in *DataMigrateList) DeepCopy() *DataMigrateList {
if in == nil {
return nil
}
out := new(DataMigrateList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *DataMigrateList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataMigrateSpec) DeepCopyInto(out *DataMigrateSpec) {
*out = *in
out.VersionSpec = in.VersionSpec
in.From.DeepCopyInto(&out.From)
in.To.DeepCopyInto(&out.To)
if in.Options != nil {
in, out := &in.Options, &out.Options
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
in.PodMetadata.DeepCopyInto(&out.PodMetadata)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataMigrateSpec.
func (in *DataMigrateSpec) DeepCopy() *DataMigrateSpec {
if in == nil {
return nil
}
out := new(DataMigrateSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataRestoreLocation) DeepCopyInto(out *DataRestoreLocation) {
*out = *in
@ -522,6 +607,31 @@ func (in *DataRestoreLocation) DeepCopy() *DataRestoreLocation {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataToMigrate) DeepCopyInto(out *DataToMigrate) {
*out = *in
if in.DataSet != nil {
in, out := &in.DataSet, &out.DataSet
*out = new(DatasetToMigrate)
**out = **in
}
if in.ExternalStorage != nil {
in, out := &in.ExternalStorage, &out.ExternalStorage
*out = new(ExternalStorage)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataToMigrate.
func (in *DataToMigrate) DeepCopy() *DataToMigrate {
if in == nil {
return nil
}
out := new(DataToMigrate)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Dataset) DeepCopyInto(out *Dataset) {
*out = *in
@ -715,6 +825,21 @@ func (in *DatasetStatus) DeepCopy() *DatasetStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DatasetToMigrate) DeepCopyInto(out *DatasetToMigrate) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatasetToMigrate.
func (in *DatasetToMigrate) DeepCopy() *DatasetToMigrate {
if in == nil {
return nil
}
out := new(DatasetToMigrate)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EACCompTemplateSpec) DeepCopyInto(out *EACCompTemplateSpec) {
*out = *in
@ -900,6 +1025,26 @@ func (in *EncryptOptionSource) DeepCopy() *EncryptOptionSource {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ExternalStorage) DeepCopyInto(out *ExternalStorage) {
*out = *in
if in.EncryptOptions != nil {
in, out := &in.EncryptOptions, &out.EncryptOptions
*out = make([]EncryptOption, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExternalStorage.
func (in *ExternalStorage) DeepCopy() *ExternalStorage {
if in == nil {
return nil
}
out := new(ExternalStorage)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *GooseFSCompTemplateSpec) DeepCopyInto(out *GooseFSCompTemplateSpec) {
*out = *in

View File

@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@ -0,0 +1,3 @@
### 0.1.0
- Support juicefs sync

View File

@ -0,0 +1,23 @@
apiVersion: v2
name: fluid-datamigrate
description: A Helm chart for Fluid to migrate data
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: 0.1.0

View File

@ -0,0 +1,24 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ printf "%s-script" .Release.Name }}
labels:
release: {{ .Release.Name }}
role: datamigrate-job
data:
datamigrate.sh: |
#!/bin/bash
set -xe
function main() {
echo "juicefs datamigrate job start..."
if [ $EDITION == 'community' ]
then
timeout $TIMEOUT /usr/local/bin/juicefs sync {{ .Values.datamigrate.migrateFrom }} {{ .Values.datamigrate.migrateTo }} $OPTION
else
timeout $TIMEOUT /usr/bin/juicefs sync {{ .Values.datamigrate.migrateFrom }} {{ .Values.datamigrate.migrateTo }} $OPTION
fi
echo "juicefs datamigrate job end."
}
main "$@"

View File

@ -0,0 +1,81 @@
apiVersion: batch/v1
kind: Job
metadata:
name: {{ printf "%s-migrate" .Release.Name }}
labels:
release: {{ .Release.Name }}
role: datamigrate-job
app: juicefs
targetDataset: {{ required "targetDataset should be set" .Values.datamigrate.targetDataset }}
spec:
backoffLimit: {{ .Values.datamigrate.backoffLimit | default "3" }}
completions: 1
parallelism: 1
template:
metadata:
name: {{ printf "%s-migrate" .Release.Name }}
annotations:
sidecar.istio.io/inject: "false"
{{- if .Values.datamigrate.annotations }}
{{- range $key, $val := .Values.datamigrate.annotations }}
{{ $key | quote }}: {{ $val | quote }}
{{- end }}
{{- end }}
labels:
release: {{ .Release.Name }}
role: datamigrate-pod
app: juicefs
targetDataset: {{ required "targetDataset should be set" .Values.datamigrate.targetDataset }}
{{- if .Values.datamigrate.labels }}
{{- range $key, $val := .Values.datamigrate.labels }}
{{ $key | quote }}: {{ $val | quote }}
{{- end }}
{{- end }}
spec:
restartPolicy: Never
{{- with .Values.datamigrate.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: datamigrate
image: {{ required "DataMigrate image should be set" .Values.datamigrate.image }}
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c"]
args: ["/scripts/juicefs_datamigrate.sh"]
env:
{{- range $key, $val := .Values.datamigrate.options }}
{{- if eq $key "timeout" }}
- name: TIMEOUT
value: {{ $val | quote }}
{{- end }}
{{- if eq $key "edition" }}
- name: EDITION
value: {{ $val | quote }}
{{- end }}
{{- if eq $key "option" }}
- name: OPTION
value: {{ $val | quote }}
{{- end }}
{{- end }}
{{- range .Values.datamigrate.encryptOptions }}
- name: {{ .name }}
valueFrom:
secretKeyRef:
name: {{ .valueFrom.secretKeyRef.name }}
key: {{ .valueFrom.secretKeyRef.key }}
{{- end }}
envFrom:
- configMapRef:
name: {{ required "targetDataset should be set" .Values.datamigrate.targetDataset }}-juicefs-values
volumeMounts:
- mountPath: /scripts
name: data-migrate-script
volumes:
- name: data-migrate-script
configMap:
name: {{ printf "%s-script" .Release.Name }}
items:
- key: datamigrate.sh
path: juicefs_datamigrate.sh
mode: 365

View File

@ -0,0 +1,45 @@
# Default values for fluid-datamigrate.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
datamigrate:
# Optional
# Default: 3
# Description: how many times the prefetch job can fail, i.e. `Job.spec.backoffLimit`
backoffLimit: 3
# Required
# Description: the dataset that this DataMigrate targets
targetDataset: #imagenet
# Required
# Description: the source storage
migrateFrom: #<source-storage>
# Required
# Description: the destination storage
migrateTo: #<target-filesystem>
# Optional
# Description: the secret that contains the credentials of the source storage
encryptOptions:
# Required
# Description: the image that the DataMigrate job uses
image: #<juicefs-image>
# Optional
# Description: optional parameter DataMigrate job uses
options:
# Optional
# Description: optional labels on DataMigrate pods
labels:
# Optional
# Description: optional annotations on DataMigrate pods
annotations:
# Optional
# Description: optional image pull secrets on DataLoad pods
imagePullSecrets: []

View File

@ -0,0 +1,268 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.8.0
creationTimestamp: null
name: datamigrates.data.fluid.io
spec:
group: data.fluid.io
names:
kind: DataMigrate
listKind: DataMigrateList
plural: datamigrates
singular: datamigrate
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.phase
name: Phase
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.duration
name: Duration
type: string
name: v1alpha1
schema:
openAPIV3Schema:
description: DataMigrate is the Schema for the datamigrates API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: DataMigrateSpec defines the desired state of DataMigrate
properties:
block:
description: if dataMigrate blocked dataset usage, default is false
type: boolean
from:
description: data to migrate source, including dataset and external
storage
properties:
dataset:
description: dataset to migrate
properties:
name:
description: name of dataset
type: string
namespace:
description: namespace of dataset
type: string
path:
description: path to migrate
type: string
required:
- name
- namespace
type: object
externalStorage:
description: external storage for data migrate
properties:
encryptOptions:
description: encrypt info for external storage
items:
properties:
name:
description: The name of encryptOption
type: string
valueFrom:
description: The valueFrom of encryptOption
properties:
secretKeyRef:
description: The encryptInfo obtained from secret
properties:
key:
description: The required key in the secret
type: string
name:
description: The name of required secret
type: string
type: object
type: object
type: object
type: array
uri:
description: type of external storage, including s3, oss,
gcs, ceph, nfs, pvc, etc. (related to runtime)
type: string
required:
- uri
type: object
type: object
image:
description: Image (e.g. alluxio/alluxio)
type: string
imagePullPolicy:
description: 'One of the three policies: `Always`, `IfNotPresent`,
`Never`'
type: string
imageTag:
description: Image tag (e.g. 2.3.0-SNAPSHOT)
type: string
options:
additionalProperties:
type: string
description: options for migrate, different for each runtime
type: object
podMetadata:
description: PodMetadata defines labels and annotations that will
be propagated to DataLoad pods
properties:
annotations:
additionalProperties:
type: string
description: Annotations are annotations of pod specification
type: object
labels:
additionalProperties:
type: string
description: Labels are labels of pod specification
type: object
type: object
policy:
description: policy for migrate, including None, Once, Cron, OnEvent
type: string
runtimeType:
description: using which runtime to migrate data; if none, take dataset
runtime as default
type: string
schedule:
description: The schedule in Cron format, only set when policy is
cron, see https://en.wikipedia.org/wiki/Cron.
type: string
to:
description: data to migrate destination, including dataset and external
storage
properties:
dataset:
description: dataset to migrate
properties:
name:
description: name of dataset
type: string
namespace:
description: namespace of dataset
type: string
path:
description: path to migrate
type: string
required:
- name
- namespace
type: object
externalStorage:
description: external storage for data migrate
properties:
encryptOptions:
description: encrypt info for external storage
items:
properties:
name:
description: The name of encryptOption
type: string
valueFrom:
description: The valueFrom of encryptOption
properties:
secretKeyRef:
description: The encryptInfo obtained from secret
properties:
key:
description: The required key in the secret
type: string
name:
description: The name of required secret
type: string
type: object
type: object
type: object
type: array
uri:
description: type of external storage, including s3, oss,
gcs, ceph, nfs, pvc, etc. (related to runtime)
type: string
required:
- uri
type: object
type: object
required:
- from
- to
type: object
status:
description: OperationStatus defines the observed state of operation
properties:
conditions:
description: Conditions consists of transition information on operation's
Phase
items:
description: Condition explains the transitions on phase
properties:
lastProbeTime:
description: LastProbeTime describes last time this condition
was updated.
format: date-time
type: string
lastTransitionTime:
description: LastTransitionTime describes last time the condition
transitioned from one status to another.
format: date-time
type: string
message:
description: Message is a human-readable message indicating
details about the transition
type: string
reason:
description: Reason for the condition's last transition
type: string
status:
description: Status of the condition, one of `True`, `False`
or `Unknown`
type: string
type:
description: Type of condition, either `Complete` or `Failed`
type: string
required:
- status
- type
type: object
type: array
duration:
description: Duration tell user how much time was spent to operation
type: string
infos:
additionalProperties:
type: string
description: Infos operation customized name-value
type: object
phase:
description: Phase describes current phase of operation
type: string
required:
- conditions
- duration
- phase
type: object
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

View File

@ -412,6 +412,11 @@ spec:
this Dataset. This is mainly used as a lock to prevent concurrent
DataLoad jobs.
type: string
dataMigrateRef:
description: DataMigrateRef specifies the running DataMigrate job
that targets this Dataset. This is mainly used as a lock to prevent
concurrent DataMigrate jobs.
type: string
datasetRef:
description: DatasetRef specifies the datasets namespaced name mounting
this Dataset.

View File

@ -36,6 +36,8 @@ rules:
- databackups/status
- datasets
- datasets/status
- datamigrates
- datamigrates/status
- alluxioruntimes
- alluxioruntimes/status
- jindoruntimes

View File

@ -20,14 +20,6 @@ import (
"os"
"time"
"github.com/fluid-cloudnative/fluid"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
databackupctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/databackup"
dataloadctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataload"
datasetctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataset"
"github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/spf13/cobra"
zapOpt "go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -37,6 +29,16 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"github.com/fluid-cloudnative/fluid"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
databackupctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/databackup"
dataloadctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataload"
datamigratectl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/datamigrate"
datasetctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataset"
"github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
)
var (
@ -137,6 +139,15 @@ func handle() {
os.Exit(1)
}
if err = (datamigratectl.NewDataMigrateReconciler(mgr.GetClient(),
ctrl.Log.WithName("datamigratectl").WithName("DataMigrate"),
mgr.GetScheme(),
mgr.GetEventRecorderFor("DataMigrate"),
)).SetupWithManager(mgr, controllerOptions); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DataMigrate")
os.Exit(1)
}
setupLog.Info("starting dataset-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running dataset-controller")

View File

@ -0,0 +1,268 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.8.0
creationTimestamp: null
name: datamigrates.data.fluid.io
spec:
group: data.fluid.io
names:
kind: DataMigrate
listKind: DataMigrateList
plural: datamigrates
singular: datamigrate
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.phase
name: Phase
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.duration
name: Duration
type: string
name: v1alpha1
schema:
openAPIV3Schema:
description: DataMigrate is the Schema for the datamigrates API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: DataMigrateSpec defines the desired state of DataMigrate
properties:
block:
description: if dataMigrate blocked dataset usage, default is false
type: boolean
from:
description: data to migrate source, including dataset and external
storage
properties:
dataset:
description: dataset to migrate
properties:
name:
description: name of dataset
type: string
namespace:
description: namespace of dataset
type: string
path:
description: path to migrate
type: string
required:
- name
- namespace
type: object
externalStorage:
description: external storage for data migrate
properties:
encryptOptions:
description: encrypt info for external storage
items:
properties:
name:
description: The name of encryptOption
type: string
valueFrom:
description: The valueFrom of encryptOption
properties:
secretKeyRef:
description: The encryptInfo obtained from secret
properties:
key:
description: The required key in the secret
type: string
name:
description: The name of required secret
type: string
type: object
type: object
type: object
type: array
uri:
description: type of external storage, including s3, oss,
gcs, ceph, nfs, pvc, etc. (related to runtime)
type: string
required:
- uri
type: object
type: object
image:
description: Image (e.g. alluxio/alluxio)
type: string
imagePullPolicy:
description: 'One of the three policies: `Always`, `IfNotPresent`,
`Never`'
type: string
imageTag:
description: Image tag (e.g. 2.3.0-SNAPSHOT)
type: string
options:
additionalProperties:
type: string
description: options for migrate, different for each runtime
type: object
podMetadata:
description: PodMetadata defines labels and annotations that will
be propagated to DataLoad pods
properties:
annotations:
additionalProperties:
type: string
description: Annotations are annotations of pod specification
type: object
labels:
additionalProperties:
type: string
description: Labels are labels of pod specification
type: object
type: object
policy:
description: policy for migrate, including None, Once, Cron, OnEvent
type: string
runtimeType:
description: using which runtime to migrate data; if none, take dataset
runtime as default
type: string
schedule:
description: The schedule in Cron format, only set when policy is
cron, see https://en.wikipedia.org/wiki/Cron.
type: string
to:
description: data to migrate destination, including dataset and external
storage
properties:
dataset:
description: dataset to migrate
properties:
name:
description: name of dataset
type: string
namespace:
description: namespace of dataset
type: string
path:
description: path to migrate
type: string
required:
- name
- namespace
type: object
externalStorage:
description: external storage for data migrate
properties:
encryptOptions:
description: encrypt info for external storage
items:
properties:
name:
description: The name of encryptOption
type: string
valueFrom:
description: The valueFrom of encryptOption
properties:
secretKeyRef:
description: The encryptInfo obtained from secret
properties:
key:
description: The required key in the secret
type: string
name:
description: The name of required secret
type: string
type: object
type: object
type: object
type: array
uri:
description: type of external storage, including s3, oss,
gcs, ceph, nfs, pvc, etc. (related to runtime)
type: string
required:
- uri
type: object
type: object
required:
- from
- to
type: object
status:
description: OperationStatus defines the observed state of operation
properties:
conditions:
description: Conditions consists of transition information on operation's
Phase
items:
description: Condition explains the transitions on phase
properties:
lastProbeTime:
description: LastProbeTime describes last time this condition
was updated.
format: date-time
type: string
lastTransitionTime:
description: LastTransitionTime describes last time the condition
transitioned from one status to another.
format: date-time
type: string
message:
description: Message is a human-readable message indicating
details about the transition
type: string
reason:
description: Reason for the condition's last transition
type: string
status:
description: Status of the condition, one of `True`, `False`
or `Unknown`
type: string
type:
description: Type of condition, either `Complete` or `Failed`
type: string
required:
- status
- type
type: object
type: array
duration:
description: Duration tell user how much time was spent to operation
type: string
infos:
additionalProperties:
type: string
description: Infos operation customized name-value
type: object
phase:
description: Phase describes current phase of operation
type: string
required:
- conditions
- duration
- phase
type: object
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

View File

@ -412,6 +412,11 @@ spec:
this Dataset. This is mainly used as a lock to prevent concurrent
DataLoad jobs.
type: string
dataMigrateRef:
description: DataMigrateRef specifies the running DataMigrate job
that targets this Dataset. This is mainly used as a lock to prevent
concurrent DataMigrate jobs.
type: string
datasetRef:
description: DatasetRef specifies the datasets namespaced name mounting
this Dataset.

View File

@ -12,6 +12,7 @@ resources:
- bases/data.fluid.io_thinruntimes.yaml
- bases/data.fluid.io_thinruntimeprofiles.yaml
- bases/data.fluid.io_eacruntimes.yaml
- bases/data.fluid.io_datamigrates.yaml
# +kubebuilder:scaffold:crdkustomizeresource
patchesStrategicMerge:
@ -24,6 +25,7 @@ patchesStrategicMerge:
#- patches/webhook_in_thinruntimes.yaml
#- patches/webhook_in_ThinRuntimeProfiles.yaml
#- patches/webhook_in_eacruntimes.yaml
#- patches/webhook_in_datamigrates.yaml
# +kubebuilder:scaffold:crdkustomizewebhookpatch
# [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix.
@ -35,6 +37,7 @@ patchesStrategicMerge:
#- patches/cainjection_in_thinruntimes.yaml
#- patches/cainjection_in_ThinRuntimeProfiles.yaml
#- patches/cainjection_in_eacruntimes.yaml
#- patches/cainjection_in_datamigrates.yaml
# +kubebuilder:scaffold:crdkustomizecainjectionpatch
# the following config is for teaching kustomize how to do kustomization for CRDs.

View File

@ -0,0 +1,7 @@
# The following patch adds a directive for certmanager to inject CA into the CRD
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
name: datamigrates.data.fluid.io

View File

@ -0,0 +1,16 @@
# The following patch enables a conversion webhook for the CRD
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: datamigrates.data.fluid.io
spec:
conversion:
strategy: Webhook
webhook:
clientConfig:
service:
namespace: system
name: webhook-service
path: /convert
conversionReviewVersions:
- v1

View File

@ -0,0 +1,24 @@
# permissions for end users to edit datamigrates.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: datamigrate-editor-role
rules:
- apiGroups:
- data.fluid.io
resources:
- datamigrates
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- data.fluid.io
resources:
- datamigrates/status
verbs:
- get

View File

@ -0,0 +1,20 @@
# permissions for end users to view datamigrates.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: datamigrate-viewer-role
rules:
- apiGroups:
- data.fluid.io
resources:
- datamigrates
verbs:
- get
- list
- watch
- apiGroups:
- data.fluid.io
resources:
- datamigrates/status
verbs:
- get

View File

@ -0,0 +1,7 @@
apiVersion: data.fluid.io/v1alpha1
kind: DataMigrate
metadata:
name: datamigrate-sample
spec:
# Add fields here
foo: bar

View File

@ -58,6 +58,14 @@ const (
FuseRecoverSucceed = "FuseRecoverSucceed"
RuntimeDeprecated = "RuntimeDeprecated"
DataMigrateCollision = "DataMigrateCollision"
DataMigrateJobStarted = "DataMigrateJobStarted"
DataMigrateJobFailed = "DataMigrateJobFailed"
DataMigrateJobComplete = "DataMigrateJobComplete"
)
type CacheStoreType string

View File

@ -32,6 +32,8 @@ const (
DefaultJuiceFSRuntimeImage = "juicedata/juicefs-fuse:v1.0.0"
DefaultJuiceFSMigrateImage = "juicedata/juicefs-fuse:nightly"
JuiceFSCeMountPath = "/bin/mount.juicefs"
JuiceFSMountPath = "/sbin/mount.juicefs"
JuiceCeCliPath = "/usr/local/bin/juicefs"

View File

@ -0,0 +1,249 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamigrate
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
cdatamigrate "github.com/fluid-cloudnative/fluid/pkg/datamigrate"
"github.com/fluid-cloudnative/fluid/pkg/ddc"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
)
const controllerName string = "DataMigrateReconciler"
// DataMigrateReconciler reconciles a DataMigrate object
type DataMigrateReconciler struct {
Scheme *runtime.Scheme
engines map[string]base.Engine
mutex *sync.Mutex
*DataMigrateReconcilerImplement
}
// NewDataMigrateReconciler returns a DataMigrateReconciler
func NewDataMigrateReconciler(client client.Client,
log logr.Logger,
scheme *runtime.Scheme,
recorder record.EventRecorder) *DataMigrateReconciler {
r := &DataMigrateReconciler{
Scheme: scheme,
mutex: &sync.Mutex{},
engines: map[string]base.Engine{},
}
r.DataMigrateReconcilerImplement = NewDataMigrateReconcilerImplement(client, log, recorder)
return r
}
// +kubebuilder:rbac:groups=data.fluid.io,resources=datamigrates,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=data.fluid.io,resources=datamigrates/status,verbs=get;update;patch
// Reconcile reconciles the DataMigrate object
func (r *DataMigrateReconciler) Reconcile(context context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx := cruntime.ReconcileRequestContext{
Context: context,
Log: r.Log.WithValues("datamigrate", req.NamespacedName),
Recorder: r.Recorder,
Client: r.Client,
Category: common.AccelerateCategory,
}
// 1. Get DataMigrate object
dataMigrate, err := utils.GetDataMigrate(r.Client, req.Name, req.Namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("DataMigrate not found")
return utils.NoRequeue()
} else {
ctx.Log.Error(err, "failed to get DataMigrate")
return utils.RequeueIfError(errors.Wrap(err, "failed to get DataMigrate info"))
}
}
targetDataMigrate := *dataMigrate
ctx.Log.V(1).Info("dataMigrate found", "detail", dataMigrate)
// 2. Reconcile deletion of the object if necessary
if utils.HasDeletionTimestamp(dataMigrate.ObjectMeta) {
return r.ReconcileDataMigrateDeletion(ctx, targetDataMigrate, r.engines, r.mutex)
}
// 3. get target dataset
targetDataset, err := utils.GetTargetDatasetOfMigrate(r.Client, targetDataMigrate)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("can't find target dataset", "dataMigrate", targetDataMigrate.Name)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.TargetDatasetNotFound,
"Target dataset not found")
return utils.RequeueAfterInterval(20 * time.Second)
}
// other error
ctx.Log.Error(err, "Failed to get the ddc dataset")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset"))
}
ctx.Dataset = targetDataset
ctx.NamespacedName = types.NamespacedName{
Name: targetDataset.Name,
Namespace: targetDataset.Namespace,
}
// 4. get the runtime
index, boundedRuntime := utils.GetRuntimeByCategory(targetDataset.Status.Runtimes, common.AccelerateCategory)
if index == -1 {
ctx.Log.Info("bounded runtime with Accelerate Category is not found on the target dataset", "targetDataset", targetDataset)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.RuntimeNotReady,
"Bounded accelerate runtime not ready")
return utils.RequeueAfterInterval(20 * time.Second)
}
if targetDataMigrate.Spec.RuntimeType != "" && targetDataMigrate.Spec.RuntimeType != boundedRuntime.Type {
err = fmt.Errorf("the runtime type of the target dataset is %s, but the runtime type of the dataMigrate is %s", boundedRuntime.Type, targetDataMigrate.Spec.RuntimeType)
return utils.RequeueIfError(errors.Wrap(err, "Unable to get ddc runtime"))
}
ctx.RuntimeType = boundedRuntime.Type
var fluidRuntime client.Object
switch ctx.RuntimeType {
case common.AlluxioRuntime:
fluidRuntime, err = utils.GetAlluxioRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
case common.JindoRuntime:
fluidRuntime, err = utils.GetJindoRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
ctx.RuntimeType = jindo.GetRuntimeType()
case common.GooseFSRuntime:
fluidRuntime, err = utils.GetGooseFSRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
case common.JuiceFSRuntime:
fluidRuntime, err = utils.GetJuiceFSRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
default:
ctx.Log.Error(fmt.Errorf("RuntimeNotSupported"), "The runtime is not supported yet", "runtime", boundedRuntime)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.RuntimeNotReady,
"Bounded accelerate runtime not supported")
}
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.V(1).Info("The runtime is not found", "runtime", ctx.NamespacedName)
return ctrl.Result{}, nil
} else {
ctx.Log.Error(err, "Failed to get the ddc runtime")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get ddc runtime"))
}
}
ctx.Runtime = fluidRuntime
ctx.Log.V(1).Info("get the runtime", "runtime", ctx.Runtime)
// 5. create or get engine
engine, err := r.GetOrCreateEngine(ctx)
if err != nil {
r.Recorder.Eventf(&targetDataMigrate, v1.EventTypeWarning, common.ErrorProcessDatasetReason, "Process dataMigrate error %v", err)
return utils.RequeueIfError(errors.Wrap(err, "Failed to create or get engine"))
}
// 6. add finalizer and requeue
if !utils.ContainsString(targetDataMigrate.ObjectMeta.GetFinalizers(), cdatamigrate.DataMigrateFinalizer) {
return r.addFinalizerAndRequeue(ctx, targetDataMigrate)
}
// 7. add owner and requeue
if !utils.ContainsOwners(targetDataMigrate.GetOwnerReferences(), targetDataset) {
return r.AddOwnerAndRequeue(ctx, targetDataMigrate, targetDataset)
}
return r.ReconcileDataMigrate(ctx, targetDataMigrate, engine)
}
// AddOwnerAndRequeue adds Owner and requeue
func (r *DataMigrateReconciler) AddOwnerAndRequeue(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate, targetDataset *datav1alpha1.Dataset) (ctrl.Result, error) {
targetDataMigrate.ObjectMeta.OwnerReferences = append(targetDataMigrate.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: targetDataset.APIVersion,
Kind: targetDataset.Kind,
Name: targetDataset.Name,
UID: targetDataset.UID,
})
if err := r.Update(ctx, &targetDataMigrate); err != nil {
ctx.Log.Error(err, "Failed to add ownerreference", "StatusUpdateError", ctx)
return utils.RequeueIfError(err)
}
return utils.RequeueImmediately()
}
func (r *DataMigrateReconciler) addFinalizerAndRequeue(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (ctrl.Result, error) {
targetDataMigrate.ObjectMeta.Finalizers = append(targetDataMigrate.ObjectMeta.Finalizers, cdatamigrate.DataMigrateFinalizer)
ctx.Log.Info("Add finalizer and requeue", "finalizer", cdatamigrate.DataMigrateFinalizer)
prevGeneration := targetDataMigrate.ObjectMeta.GetGeneration()
if err := r.Update(ctx, &targetDataMigrate); err != nil {
ctx.Log.Error(err, "failed to add finalizer to dataMigrate", "StatusUpdateError", err)
return utils.RequeueIfError(err)
}
return utils.RequeueImmediatelyUnlessGenerationChanged(prevGeneration, targetDataMigrate.ObjectMeta.GetGeneration())
}
// SetupWithManager sets up the controller with the given controller manager
func (r *DataMigrateReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&datav1alpha1.DataMigrate{}).
Complete(r)
}
// GetOrCreateEngine gets the Engine
func (r *DataMigrateReconciler) GetOrCreateEngine(
ctx cruntime.ReconcileRequestContext) (engine base.Engine, err error) {
found := false
id := ddc.GenerateEngineID(ctx.NamespacedName)
r.mutex.Lock()
defer r.mutex.Unlock()
if engine, found = r.engines[id]; !found {
engine, err = ddc.CreateEngine(id,
ctx)
if err != nil {
return nil, err
}
r.engines[id] = engine
r.Log.V(1).Info("Put Engine to engine map")
} else {
r.Log.V(1).Info("Get Engine from engine map")
}
return engine, err
}
func (r *DataMigrateReconciler) ControllerName() string {
return controllerName
}

View File

@ -0,0 +1,372 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamigrate
import (
"context"
"reflect"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
cdatamigrate "github.com/fluid-cloudnative/fluid/pkg/datamigrate"
"github.com/fluid-cloudnative/fluid/pkg/ddc"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
)
type DataMigrateReconcilerImplement struct {
client.Client
Log logr.Logger
Recorder record.EventRecorder
}
func NewDataMigrateReconcilerImplement(client client.Client, log logr.Logger, recorder record.EventRecorder) *DataMigrateReconcilerImplement {
return &DataMigrateReconcilerImplement{
Client: client,
Log: log,
Recorder: recorder,
}
}
func (r *DataMigrateReconcilerImplement) ReconcileDataMigrateDeletion(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate, engines map[string]base.Engine, mutex *sync.Mutex) (ctrl.Result, error) {
log := ctx.Log.WithName("ReconcileDataMigrateDeletion")
dataset := ctx.Dataset
// 1. Delete release if exists
releaseName := utils.GetDataMigrateReleaseName(targetDataMigrate.Name)
err := helm.DeleteReleaseIfExists(releaseName, targetDataMigrate.Namespace)
if err != nil {
log.Error(err, "can't delete release", "releaseName", releaseName)
return utils.RequeueIfError(err)
}
// 2. Release lock on target dataset if necessary
targetDataset, err := utils.GetTargetDatasetOfMigrate(r.Client, targetDataMigrate)
if utils.IgnoreNotFound(err) != nil {
// other error
ctx.Log.Error(err, "Failed to get the ddc dataset")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset"))
}
if targetDataset != nil {
err = r.releaseLockOnTargetDataset(ctx, targetDataMigrate, targetDataset)
if err != nil {
log.Error(err, "can't release lock on target dataset", "targetDataset", dataset.Name)
return utils.RequeueIfError(err)
}
}
// 3. delete engine
mutex.Lock()
defer mutex.Unlock()
id := ddc.GenerateEngineID(ctx.NamespacedName)
delete(engines, id)
// 4. remove finalizer
if utils.HasDeletionTimestamp(targetDataMigrate.ObjectMeta) {
targetDataMigrate.ObjectMeta.Finalizers = utils.RemoveString(targetDataMigrate.ObjectMeta.Finalizers, cdatamigrate.DataMigrateFinalizer)
if err := r.Update(ctx, &targetDataMigrate); err != nil {
log.Error(err, "failed to remove finalizer")
return utils.RequeueIfError(err)
}
log.Info("Finalizer is removed")
}
return utils.NoRequeue()
}
// ReconcileDataMigrate reconciles the DataMigrate according to its phase status
func (r *DataMigrateReconcilerImplement) ReconcileDataMigrate(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate, engine base.Engine) (ctrl.Result, error) {
log := ctx.Log.WithName("ReconcileDataMigrate")
log.V(1).Info("process the cDataMigrate", "cDataMigrate", targetDataMigrate)
// DataMigrate's phase transition: None -> Pending -> Executing -> Complete or Failed
switch targetDataMigrate.Status.Phase {
case common.PhaseNone:
return r.reconcileNoneDataMigrate(ctx, targetDataMigrate)
case common.PhasePending:
return r.reconcilePendingDataMigrate(ctx, targetDataMigrate, engine)
case common.PhaseExecuting:
return r.reconcileExecutingDataMigrate(ctx, targetDataMigrate, engine)
case common.PhaseComplete:
return r.reconcileCompleteDataMigrate(ctx, targetDataMigrate)
case common.PhaseFailed:
return r.reconcileFailedDataMigrate(ctx, targetDataMigrate)
default:
log.Info("Unknown DataMigrate phase, won't reconcile it")
}
return utils.NoRequeue()
}
// reconcileNoneDataMigrate reconciles DataMigrates that are in `DataMigratePhaseNone` phase
func (r *DataMigrateReconcilerImplement) reconcileNoneDataMigrate(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (ctrl.Result, error) {
log := ctx.Log.WithName("reconcileNoneDataMigrate")
DataMigrateToUpdate := targetDataMigrate.DeepCopy()
DataMigrateToUpdate.Status.Phase = common.PhasePending
DataMigrateToUpdate.Status.Infos = map[string]string{}
if len(DataMigrateToUpdate.Status.Conditions) == 0 {
DataMigrateToUpdate.Status.Conditions = []datav1alpha1.Condition{}
}
DataMigrateToUpdate.Status.Duration = "Unfinished"
if err := r.Status().Update(context.TODO(), DataMigrateToUpdate); err != nil {
log.Error(err, "failed to update the cDataMigrate")
return utils.RequeueIfError(err)
}
log.V(1).Info("Update phase of the cDataMigrate to Pending successfully")
return utils.RequeueImmediately()
}
// reconcilePendingDataMigrate reconciles DataMigrates that are in `DataMigratePhasePending` phase
func (r *DataMigrateReconcilerImplement) reconcilePendingDataMigrate(ctx cruntime.ReconcileRequestContext,
targetDataMigrate datav1alpha1.DataMigrate,
engine base.Engine) (ctrl.Result, error) {
log := ctx.Log.WithName("reconcilePendingDataMigrate")
targetDataSet := ctx.Dataset
// 1. Check DataMigrate namespace and dataset namespace need to be same
if targetDataMigrate.Namespace != targetDataSet.Namespace {
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeWarning,
common.TargetDatasetNamespaceNotSame,
"DataMigrate(%s) namespace is not same as dataset",
targetDataMigrate.Name)
// Update DataMigrate's phase to Failed, and no requeue
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
DataMigrate, err := utils.GetDataMigrate(r.Client, targetDataMigrate.Name, targetDataMigrate.Namespace)
if err != nil {
return err
}
DataMigrateToUpdate := DataMigrate.DeepCopy()
DataMigrateToUpdate.Status.Phase = common.PhaseFailed
DataMigrateToUpdate.Status.Infos = map[string]string{}
if !reflect.DeepEqual(DataMigrateToUpdate.Status, DataMigrate.Status) {
if err := r.Status().Update(ctx, DataMigrateToUpdate); err != nil {
return err
}
}
return nil
})
if err != nil {
log.Error(err, "can't update DataMigrate's phase status to Failed")
return utils.RequeueIfError(err)
}
return utils.NoRequeue()
}
// 2. Check if there's any Executing DataMigrate jobs(conflict DataMigrate)
conflictDataMigrateRef := targetDataSet.Status.DataMigrateRef
myDataMigrateRef := utils.GetDataMigrateRef(targetDataMigrate.Name, targetDataMigrate.Namespace)
if len(conflictDataMigrateRef) != 0 && conflictDataMigrateRef != myDataMigrateRef {
log.V(1).Info("Found other DataMigrates that is in Executing phase, will backoff", "other DataMigrate", conflictDataMigrateRef)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.DataMigrateCollision,
"Found other DataMigrate(%s) that is in Executing phase, will backoff",
conflictDataMigrateRef)
return utils.RequeueAfterInterval(20 * time.Second)
}
// 3. Check if the bounded runtime is ready
ready := engine.CheckRuntimeReady()
if !ready {
log.V(1).Info("Bounded accelerate runtime not ready", "targetDataset", targetDataSet)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.RuntimeNotReady,
"Bounded accelerate runtime not ready")
return utils.RequeueAfterInterval(20 * time.Second)
}
// 4. lock the target dataset and update dataset phase to DataMigrating.
// Make sure only one DataMigrate can win the lock and
// the losers have to requeue and go through the whole reconciliation loop.
log.Info("No conflicts detected, try to lock the target dataset and update DataSet's status to DataMigrating")
datasetToUpdate := targetDataSet.DeepCopy()
datasetToUpdate.Status.DataMigrateRef = myDataMigrateRef
datasetToUpdate.Status.Phase = datav1alpha1.DataMigrating
if !reflect.DeepEqual(targetDataSet.Status, datasetToUpdate.Status) {
if err := r.Client.Status().Update(context.TODO(), datasetToUpdate); err != nil {
log.V(1).Info("fail to get target dataset's lock, will requeue")
//todo(xuzhihao): random backoff
return utils.RequeueAfterInterval(20 * time.Second)
}
}
// 5. update phase to Executing
// We offload the helm install logic to `reconcileExecutingDataMigrate` to
// avoid such a case that status.phase change successfully first but helm install failed,
// where the DataMigrate job will never start and all other DataMigrates will be blocked forever.
log.Info("Get lock on target dataset, try to update phase")
DataMigrateToUpdate := targetDataMigrate.DeepCopy()
DataMigrateToUpdate.Status.Phase = common.PhaseExecuting
if err := r.Client.Status().Update(context.TODO(), DataMigrateToUpdate); err != nil {
log.Error(err, "failed to update DataMigrate's status to Executing, will retry")
return utils.RequeueIfError(err)
}
log.V(1).Info("update DataMigrate's status to Executing successfully")
return utils.RequeueImmediately()
}
// reconcileExecutingDataMigrate reconciles DataMigrates that are in `Executing` phase
func (r *DataMigrateReconcilerImplement) reconcileExecutingDataMigrate(ctx cruntime.ReconcileRequestContext,
targetDataMigrate datav1alpha1.DataMigrate,
engine base.Engine) (ctrl.Result, error) {
log := ctx.Log.WithName("reconcileExecutingDataMigrate")
// 1. Install the helm chart if not exists and requeue
err := engine.MigrateData(ctx, targetDataMigrate)
if err != nil {
log.Error(err, "engine migrate data failed")
return utils.RequeueIfError(err)
}
// 2. Check running status of the DataMigrate job
releaseName := utils.GetDataMigrateReleaseName(targetDataMigrate.Name)
jobName := utils.GetDataMigrateJobName(releaseName)
log.V(1).Info("DataMigrate chart already existed, check its running status")
job, err := utils.GetDataMigrateJob(r.Client, jobName, ctx.Namespace)
if err != nil {
// helm release found but job missing, delete the helm release and requeue
if utils.IgnoreNotFound(err) == nil {
log.Info("Related Job missing, will delete helm chart and retry", "namespace", ctx.Namespace, "jobName", jobName)
if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil {
log.Error(err, "can't delete DataMigrate release", "namespace", ctx.Namespace, "releaseName", releaseName)
return utils.RequeueIfError(err)
}
return utils.RequeueAfterInterval(20 * time.Second)
}
// other error
log.Error(err, "can't get DataMigrate job", "namespace", ctx.Namespace, "jobName", jobName)
return utils.RequeueIfError(err)
}
if len(job.Status.Conditions) != 0 {
if job.Status.Conditions[0].Type == batchv1.JobFailed ||
job.Status.Conditions[0].Type == batchv1.JobComplete {
// job either failed or complete, update DataMigrate's phase status
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
DataMigrate, err := utils.GetDataMigrate(r.Client, targetDataMigrate.Name, targetDataMigrate.Namespace)
if err != nil {
return err
}
DataMigrateToUpdate := DataMigrate.DeepCopy()
jobCondition := job.Status.Conditions[0]
DataMigrateToUpdate.Status.Conditions = []datav1alpha1.Condition{
{
Type: common.ConditionType(jobCondition.Type),
Status: jobCondition.Status,
Reason: jobCondition.Reason,
Message: jobCondition.Message,
LastProbeTime: jobCondition.LastProbeTime,
LastTransitionTime: jobCondition.LastTransitionTime,
},
}
if jobCondition.Type == batchv1.JobFailed {
DataMigrateToUpdate.Status.Phase = common.PhaseFailed
} else {
DataMigrateToUpdate.Status.Phase = common.PhaseComplete
}
DataMigrateToUpdate.Status.Duration = utils.CalculateDuration(DataMigrateToUpdate.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time)
if !reflect.DeepEqual(DataMigrateToUpdate.Status, DataMigrate.Status) {
if err := r.Status().Update(ctx, DataMigrateToUpdate); err != nil {
return err
}
}
return nil
})
if err != nil {
log.Error(err, "can't update DataMigrate's phase status to Failed")
return utils.RequeueIfError(err)
}
return utils.RequeueImmediately()
}
}
log.V(1).Info("DataMigrate job still running", "namespace", ctx.Namespace, "jobName", jobName)
return utils.RequeueAfterInterval(20 * time.Second)
}
// reconcileCompleteDataMigrate reconciles DataMigrates that are in `DataMigratePhaseComplete` phase
func (r *DataMigrateReconcilerImplement) reconcileCompleteDataMigrate(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (ctrl.Result, error) {
log := ctx.Log.WithName("reconcileCompleteDataMigrate")
// 1. release lock on target dataset
err := r.releaseLockOnTargetDataset(ctx, targetDataMigrate, ctx.Dataset)
if err != nil {
log.Error(err, "can't release lock on target dataset", "targetDataset", ctx.Dataset)
return utils.RequeueIfError(err)
}
// 2. record event and no requeue
log.Info("DataMigrate complete, no need to requeue")
jobName := utils.GetDataMigrateJobName(utils.GetDataMigrateReleaseName(targetDataMigrate.Name))
r.Recorder.Eventf(&targetDataMigrate, v1.EventTypeNormal, common.DataMigrateJobComplete, "DataMigrate job %s succeeded", jobName)
return utils.NoRequeue()
}
// reconcileFailedDataMigrate reconciles DataMigrates that are in `DataMigratePhaseComplete` phase
func (r *DataMigrateReconcilerImplement) reconcileFailedDataMigrate(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (ctrl.Result, error) {
log := ctx.Log.WithName("reconcileFailedDataMigrate")
// 1. release lock on target dataset
err := r.releaseLockOnTargetDataset(ctx, targetDataMigrate, ctx.Dataset)
if err != nil {
log.Error(err, "can't release lock on target dataset", "targetDataset", ctx.Dataset)
return utils.RequeueIfError(err)
}
// 2. record event and no requeue
log.Info("DataMigrate failed, won't requeue")
jobName := utils.GetDataMigrateJobName(utils.GetDataMigrateReleaseName(targetDataMigrate.Name))
r.Recorder.Eventf(&targetDataMigrate, v1.EventTypeNormal, common.DataMigrateJobFailed, "DataMigrate job %s failed", jobName)
return utils.NoRequeue()
}
// releaseLockOnTargetDataset releases lock on target dataset if the lock currently belongs to reconciling DataMigrate.
// We use a key-value pair on the target dataset's status as the lock. To release the lock, we can simply set the value to empty.
func (r *DataMigrateReconcilerImplement) releaseLockOnTargetDataset(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate, targetDataSet *datav1alpha1.Dataset) error {
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if targetDataSet.Status.DataMigrateRef != utils.GetDataMigrateRef(targetDataMigrate.Name, targetDataMigrate.Namespace) {
r.Log.Info("Found DataMigrateRef inconsistent with the reconciling DataMigrate, won't release this lock, ignore it", "DataMigrateRef", targetDataSet.Status.DataMigrateRef)
return nil
}
datasetToUpdate := targetDataSet.DeepCopy()
datasetToUpdate.Status.DataMigrateRef = ""
datasetToUpdate.Status.Phase = datav1alpha1.BoundDatasetPhase
if !reflect.DeepEqual(datasetToUpdate.Status, targetDataSet) {
if err := r.Status().Update(ctx, datasetToUpdate); err != nil {
return err
}
}
return nil
})
return err
}

View File

@ -1,3 +1,19 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package juicefs
import (

View File

@ -0,0 +1,22 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamigrate
const (
DataMigrateFinalizer = "fluid-datamigrate-controller-finalizer"
DataMigrateChart = "fluid-datamigrate"
)

59
pkg/datamigrate/value.go Normal file
View File

@ -0,0 +1,59 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamigrate
import (
corev1 "k8s.io/api/core/v1"
"github.com/fluid-cloudnative/fluid/api/v1alpha1"
)
type DataMigrateValue struct {
DataMigrateInfo DataMigrateInfo `json:"datamigrate"`
}
type DataMigrateInfo struct {
// BackoffLimit specifies the upper limit times when the DataMigrate job fails
BackoffLimit int32 `json:"backoffLimit,omitempty"`
// TargetDataset specifies the dataset that the DataLoad targets
TargetDataset string `json:"targetDataset,omitempty"`
// MigrateFrom specifies the data that the DataMigrate migrate from
MigrateFrom string `json:"migrateFrom,omitempty"`
// MigrateTo specifies the data that the DataMigrate migrate to
MigrateTo string `json:"migrateTo,omitempty"`
// EncryptOptions specifies the encrypt options that the DataMigrate job uses
EncryptOptions []v1alpha1.EncryptOption `json:"encryptOptions,omitempty"`
// Image specifies the image that the DataMigrate job uses
Image string `json:"image,omitempty"`
// Options specifies the extra dataMigrate properties for runtime
Options map[string]string `json:"options,omitempty"`
// Labels defines labels in DataMigrate's pod metadata
Labels map[string]string `json:"labels,omitempty"`
// Annotations defines annotations in DataMigrate's pod metadata
Annotations map[string]string `json:"annotations,omitempty"`
// image pull secrets
ImagePullSecrets []corev1.LocalObjectReference `yaml:"imagePullSecrets,omitempty"`
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package alluxio
import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (e *AlluxioEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error {
// TODO implement me
return fmt.Errorf("not implemented yet")
}

View File

@ -46,6 +46,9 @@ type Engine interface {
// Dataloader
Dataloader
// Datamigrater
Datamigrater
}
type Dataloader interface {
@ -59,6 +62,11 @@ type Dataloader interface {
CheckExistenceOfPath(targetDataload datav1alpha1.DataLoad) (notExist bool, err error)
}
type Datamigrater interface {
// MigrateData generate datamigrate values and install helm chart
MigrateData(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (err error)
}
// Implement is what the real engine should implement if it use the TemplateEngine
type Implement interface {
UnderFileSystemService
@ -132,6 +140,9 @@ type Implement interface {
// CreateDataLoadJob creates the job to load data
CreateDataLoadJob(ctx cruntime.ReconcileRequestContext, targetDataload datav1alpha1.DataLoad) error
// CreateDataMigrateJob creates the job to load data
CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error
// checks if the runtime is ready
CheckRuntimeReady() (ready bool)

View File

@ -0,0 +1,30 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package base
import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (t *TemplateEngine) MigrateData(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (err error) {
if err = t.Implement.CreateDataMigrateJob(ctx, targetDataMigrate); err != nil {
t.Log.Error(err, "Failed to create the DataMigrate job.")
return err
}
return err
}

View File

@ -5,12 +5,13 @@
package base
import (
reflect "reflect"
"reflect"
v1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
runtime "github.com/fluid-cloudnative/fluid/pkg/runtime"
utils "github.com/fluid-cloudnative/fluid/pkg/utils"
gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
)
// MockEngine is a mock of Engine interface.
@ -378,6 +379,13 @@ func (m *MockImplement) CreateDataLoadJob(ctx runtime.ReconcileRequestContext, t
return ret0
}
func (m *MockImplement) CreateDataMigrateJob(ctx runtime.ReconcileRequestContext, targetDataMigrate v1alpha1.DataMigrate) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateDataMigrateJob", ctx, targetDataMigrate)
ret0, _ := ret[0].(error)
return ret0
}
// CreateDataLoadJob indicates an expected call of CreateDataLoadJob.
func (mr *MockImplementMockRecorder) CreateDataLoadJob(ctx, targetDataload interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()

View File

@ -21,9 +21,10 @@ import (
"os"
"time"
"k8s.io/apimachinery/pkg/types"
"github.com/fluid-cloudnative/fluid/pkg/metrics"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"

View File

@ -0,0 +1,29 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eac
import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (e *EACEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error {
// TODO implement me
return fmt.Errorf("not implemented yet")
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package goosefs
import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (e *GooseFSEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error {
// TODO implement me
return fmt.Errorf("not implemented yet")
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package jindo
import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (e *JindoEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error {
// TODO implement me
return fmt.Errorf("not implemented yet")
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package jindofsx
import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (e *JindoFSxEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error {
// TODO implement me
return fmt.Errorf("not implemented yet")
}

View File

@ -47,7 +47,11 @@ const (
JuiceSecretKey = "secret-key"
JuiceToken = "token"
MountPath = "mountpath"
Edition = "edition"
DefaultDataLoadTimeout = "30m"
MountPath = "mountpath"
Edition = "edition"
MetaurlSecret = "metaurlSecret"
MetaurlSecretKey = "metaurlSecretKey"
Name = "name"
DefaultDataLoadTimeout = "30m"
DefaultDataMigrateTimeout = "30m"
)

View File

@ -12,51 +12,98 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/brahma-adshonor/gohook"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
)
var valuesConfigMapData = `
fullnameOverride: test-dataset
cacheDirs:
"1":
path: /var/foo
type: hostPath
configs:
accesskeySecret: jfs-secret
accesskeySecretKey: accesskey
bucket: http://minio.default.svc.cluster.local:9000/minio/test2
formatCmd: /usr/local/bin/juicefs format --trash-days=0 --access-key=${ACCESS_KEY}
--secret-key=${SECRET_KEY} --storage=minio --bucket=http://minio.default.svc.cluster.local:9000/minio/test2
${METAURL} minio
metaurlSecret: jfs-secret
metaurlSecretKey: metaurl
name: minio
secretkeySecret: jfs-secret
secretkeySecretKey: secretkey
storage: minio
edition: community
image: juicedata/juicefs-csi-driver
imageTag: v0.11.0
imagePullPolicy: IfNotPresent
user: 0
group: 0
fsGroup: 0
fullnameOverride: jfsdemo
fuse:
prepare:
subPath: /dir1
name: pics
accesskeySecret: juicefs-secret
secretkeySecret: juicefs-secret
bucket: http://xx.xx.xx.xx/pics
metaurlSecret: juicefs-secret
storage: minio
image: juicedata/juicefs-csi-driver
nodeSelector:
fluid.io/f-fluid-test-dataset: "true"
imageTag: v0.11.0
mountPath: /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse
cacheDir: /tmp/jfs-cache
hostMountPath: /runtime-mnt/juicefs/fluid/test-dataset
command: /bin/mount.juicefs redis://xx.xx.xx.xx:6379/1 /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse -o metrics=0.0.0.0:9567,subdir=/dir1,cache-size=4096,free-space-ratio=0.1,cache-dir=/tmp/jfs-cache
statCmd: stat -c %i /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse
enabled: true
command: /bin/mount.juicefs ${METAURL} /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse
-o attr-cache=7200,entry-cache=7200,metrics=0.0.0.0:9567,cache-size=1024,free-space-ratio=0.1,cache-dir=/var/foo,ro
criticalPod: true
worker:
cacheDir: /tmp/jfs-cache
enabled: true
hostMountPath: /runtime-mnt/juicefs/default/jfsdemo
hostNetwork: true
image: registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse
imagePullPolicy: IfNotPresent
imageTag: v1.0.0-4.8.0
mountPath: /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse
nodeSelector:
fluid.io/f-default-jfsdemo: "true"
privileged: true
resources: {}
statCmd: stat -c %i /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse
volumeMounts:
- mountPath: /var/foo
name: cache-dir-1
volumes:
- hostPath:
path: /var/foo
type: DirectoryOrCreate
name: cache-dir-1
group: 0
image: registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse
imagePullPolicy: IfNotPresent
imagePullSecrets: null
imageTag: v1.0.0-4.8.0
owner:
apiVersion: data.fluid.io/v1alpha1
blockOwnerDeletion: false
controller: true
enabled: true
kind: JuiceFSRuntime
name: jfsdemo
uid: 7bf75683-c4cd-4f18-9344-3adde1799250
placement: Exclusive
Events: <none>
runtimeIdentity:
name: jfsdemo
namespace: default
source: ${METAURL}
user: 0
worker:
command: /bin/mount.juicefs ${METAURL} /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse
-o cache-size=1024,free-space-ratio=0.1,cache-dir=/var/foo,ro,metrics=0.0.0.0:9567
hostNetwork: true
mountPath: /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse
privileged: true
resources: {}
statCmd: stat -c %i /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse
volumeMounts:
- mountPath: /var/foo
name: cache-dir-1
volumes:
- hostPath:
path: /var/foo
type: DirectoryOrCreate
name: cache-dir-1
`
func TestJuiceFSEngine_CreateDataLoadJob(t *testing.T) {

View File

@ -0,0 +1,250 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package juicefs
import (
"fmt"
"net/url"
"os"
"strings"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
cdatamigrate "github.com/fluid-cloudnative/fluid/pkg/datamigrate"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)
func (j *JuiceFSEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (err error) {
log := ctx.Log.WithName("createDataMigrateJob")
// 1. Check if the helm release already exists
releaseName := utils.GetDataMigrateReleaseName(targetDataMigrate.Name)
jobName := utils.GetDataMigrateJobName(releaseName)
var existed bool
existed, err = helm.CheckRelease(releaseName, targetDataMigrate.Namespace)
if err != nil {
log.Error(err, "failed to check if release exists", "releaseName", releaseName, "namespace", targetDataMigrate.Namespace)
return err
}
// 2. install the helm chart if not exists
if !existed {
log.Info("DataMigrate job helm chart not installed yet, will install")
valueFileName, err := j.generateDataMigrateValueFile(ctx, targetDataMigrate)
if err != nil {
log.Error(err, "failed to generate dataload chart's value file")
return err
}
chartName := utils.GetChartsDirectory() + "/" + cdatamigrate.DataMigrateChart + "/" + common.JuiceFSRuntime
err = helm.InstallRelease(releaseName, targetDataMigrate.Namespace, valueFileName, chartName)
if err != nil {
log.Error(err, "failed to install datamigrate chart")
return err
}
log.Info("DataLoad job helm chart successfully installed", "namespace", targetDataMigrate.Namespace, "releaseName", releaseName)
ctx.Recorder.Eventf(&targetDataMigrate, corev1.EventTypeNormal, common.DataLoadJobStarted, "The DataMigrate job %s started", jobName)
}
return err
}
func (j *JuiceFSEngine) generateDataMigrateValueFile(r cruntime.ReconcileRequestContext, dataMigrate datav1alpha1.DataMigrate) (valueFileName string, err error) {
// 1. get the target dataset
targetDataset, err := utils.GetTargetDatasetOfMigrate(r.Client, dataMigrate)
if err != nil {
return "", err
}
j.Log.Info("target dataset", "dataset", targetDataset)
// 2. get info
imageName, imageTag := dataMigrate.Spec.Image, dataMigrate.Spec.ImageTag
if len(imageName) == 0 {
defaultImageInfo := strings.Split(common.DefaultJuiceFSMigrateImage, ":")
if len(defaultImageInfo) < 1 {
panic("invalid default dataload image!")
} else {
imageName = defaultImageInfo[0]
}
}
if len(imageTag) == 0 {
defaultImageInfo := strings.Split(common.DefaultJuiceFSRuntimeImage, ":")
if len(defaultImageInfo) < 2 {
panic("invalid default dataload image!")
} else {
imageTag = defaultImageInfo[1]
}
}
image := fmt.Sprintf("%s:%s", imageName, imageTag)
imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey)
// 3. init dataMigrateInfo
dataMigrateInfo := cdatamigrate.DataMigrateInfo{
BackoffLimit: 3,
TargetDataset: targetDataset.Name,
EncryptOptions: []datav1alpha1.EncryptOption{},
Image: image,
Options: map[string]string{},
Labels: dataMigrate.Spec.PodMetadata.Labels,
Annotations: dataMigrate.Spec.PodMetadata.Annotations,
ImagePullSecrets: imagePullSecrets,
}
// 4. set options
timeout := dataMigrate.Spec.Options["timeout"]
delete(dataMigrate.Spec.Options, "timeout")
if timeout == "" {
timeout = DefaultDataMigrateTimeout
}
options := []string{}
for k, v := range dataMigrate.Spec.Options {
if v != "" {
options = append(options, fmt.Sprintf("--%s=%s", k, v))
} else {
options = append(options, fmt.Sprintf("--%s", k))
}
}
dataMigrateInfo.Options["option"] = strings.Join(options, " ")
dataMigrateInfo.Options["timeout"] = timeout
// 5. set from & to
migrateFrom, err := j.genDataUrl(dataMigrate.Spec.From, &dataMigrateInfo)
if err != nil {
return "", err
}
migrateTo, err := j.genDataUrl(dataMigrate.Spec.To, &dataMigrateInfo)
if err != nil {
return "", err
}
dataMigrateInfo.MigrateFrom = migrateFrom
dataMigrateInfo.MigrateTo = migrateTo
j.Log.Info("dataMigrateInfo", "info", dataMigrateInfo)
dataMigrateValue := cdatamigrate.DataMigrateValue{
DataMigrateInfo: dataMigrateInfo,
}
// 6. create the value file
data, err := yaml.Marshal(dataMigrateValue)
if err != nil {
return
}
j.Log.Info("dataMigrate value", "value", string(data))
valueFile, err := os.CreateTemp(os.TempDir(), fmt.Sprintf("%s-%s-migrate-values.yaml", dataMigrate.Namespace, dataMigrate.Name))
if err != nil {
return
}
err = os.WriteFile(valueFile.Name(), data, 0400)
if err != nil {
return
}
return valueFile.Name(), nil
}
func (j *JuiceFSEngine) genDataUrl(data datav1alpha1.DataToMigrate, info *cdatamigrate.DataMigrateInfo) (dataUrl string, err error) {
if data.DataSet != nil {
fsInfo, err := GetFSInfoFromConfigMap(j.Client, data.DataSet.Name, data.DataSet.Namespace)
if err != nil {
return "", err
}
info.Options[Edition] = fsInfo[Edition]
if fsInfo[Edition] == CommunityEdition {
info.EncryptOptions = append(info.EncryptOptions, datav1alpha1.EncryptOption{
Name: "FLUID_METAURL",
ValueFrom: datav1alpha1.EncryptOptionSource{
SecretKeyRef: datav1alpha1.SecretKeySelector{
Name: fsInfo[MetaurlSecret],
Key: fsInfo[MetaurlSecretKey],
},
},
})
u, err := url.Parse("jfs://FLUID_METAURL/")
if err != nil {
return "", err
}
u.Path = "/"
if data.DataSet.Path != "" {
u.Path = data.DataSet.Path
}
dataUrl = u.String()
} else {
u, err := url.Parse(fmt.Sprintf("jfs://%s/", fsInfo[Name]))
if err != nil {
return "", err
}
u.Path = "/"
if data.DataSet.Path != "" {
u.Path = data.DataSet.Path
}
dataUrl = u.String()
}
return dataUrl, nil
}
if data.ExternalStorage != nil {
u, err := url.Parse(data.ExternalStorage.URI)
if err != nil {
return "", err
}
var accessKey, secretKey, token string
for _, encryptOption := range data.ExternalStorage.EncryptOptions {
name := encryptOption.Name
keyName := name
switch name {
case "access-key":
accessKey = "${ACCESS_KEY}"
keyName = "ACCESS_KEY"
case "secret-key":
secretKey = "${SECRET_KEY}"
keyName = "SECRET_KEY"
case "token":
token = "${TOKEN}"
keyName = "TOKEN"
}
secretKeyRef := encryptOption.ValueFrom.SecretKeyRef
_, err := kubeclient.GetSecret(j.Client, secretKeyRef.Name, j.namespace)
if err != nil {
j.Log.Info("can't get the secret",
"namespace", j.namespace,
"name", j.name,
"secretName", secretKeyRef.Name)
return "", err
}
info.EncryptOptions = append(info.EncryptOptions, datav1alpha1.EncryptOption{
Name: keyName,
ValueFrom: encryptOption.ValueFrom,
})
}
if token != "" {
secretKey = fmt.Sprintf("%s:%s", secretKey, token)
}
u.User = url.UserPassword(accessKey, secretKey)
decodedValue, _ := url.QueryUnescape(u.String())
dataUrl = decodedValue
j.Log.Info("dataUrl", "dataUrl", dataUrl)
return dataUrl, nil
}
return
}

View File

@ -0,0 +1,491 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package juicefs
import (
"encoding/base64"
"errors"
"os"
"path/filepath"
"strings"
"testing"
"github.com/brahma-adshonor/gohook"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"github.com/fluid-cloudnative/fluid/api/v1alpha1"
cdatamigrate "github.com/fluid-cloudnative/fluid/pkg/datamigrate"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
)
func TestJuiceFSEngine_CreateDataMigrateJob(t *testing.T) {
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset-juicefs-values",
Namespace: "fluid",
},
Data: map[string]string{
"data": valuesConfigMapData,
},
}
mockExecCheckReleaseCommon := func(name string, namespace string) (exist bool, err error) {
return false, nil
}
mockExecCheckReleaseErr := func(name string, namespace string) (exist bool, err error) {
return false, errors.New("fail to check release")
}
mockExecInstallReleaseCommon := func(name string, namespace string, valueFile string, chartName string) error {
return nil
}
mockExecInstallReleaseErr := func(name string, namespace string, valueFile string, chartName string) error {
return errors.New("fail to install datamigrate chart")
}
wrappedUnhookCheckRelease := func() {
err := gohook.UnHook(helm.CheckRelease)
if err != nil {
t.Fatal(err.Error())
}
}
wrappedUnhookInstallRelease := func() {
err := gohook.UnHook(helm.InstallRelease)
if err != nil {
t.Fatal(err.Error())
}
}
targetDataMigrate := v1alpha1.DataMigrate{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "fluid",
},
Spec: v1alpha1.DataMigrateSpec{
From: v1alpha1.DataToMigrate{
DataSet: &v1alpha1.DatasetToMigrate{
Name: "test-dataset",
Namespace: "fluid",
},
},
To: v1alpha1.DataToMigrate{
ExternalStorage: &v1alpha1.ExternalStorage{
URI: "minio://test/test",
},
},
},
}
datasetInputs := []v1alpha1.Dataset{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "fluid",
},
},
}
podListInputs := []corev1.PodList{{
Items: []corev1.Pod{{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
},
}},
}}
testObjs := []runtime.Object{}
testObjs = append(testObjs, configMap)
for _, datasetInput := range datasetInputs {
testObjs = append(testObjs, datasetInput.DeepCopy())
}
for _, podInput := range podListInputs {
testObjs = append(testObjs, podInput.DeepCopy())
}
testScheme.AddKnownTypes(corev1.SchemeGroupVersion, configMap)
client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
engine := &JuiceFSEngine{
name: "juicefs",
namespace: "fluid",
Client: client,
Log: fake.NullLogger(),
}
ctx := cruntime.ReconcileRequestContext{
Log: fake.NullLogger(),
Client: client,
Recorder: record.NewFakeRecorder(1),
}
err := gohook.Hook(helm.CheckRelease, mockExecCheckReleaseErr, nil)
if err != nil {
t.Fatal(err.Error())
}
err = engine.CreateDataMigrateJob(ctx, targetDataMigrate)
if err == nil {
t.Errorf("fail to catch the error: %v", err)
}
wrappedUnhookCheckRelease()
err = gohook.Hook(helm.CheckRelease, mockExecCheckReleaseCommon, nil)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(helm.InstallRelease, mockExecInstallReleaseErr, nil)
if err != nil {
t.Fatal(err.Error())
}
err = engine.CreateDataMigrateJob(ctx, targetDataMigrate)
if err == nil {
t.Errorf("fail to catch the error: %v", err)
}
wrappedUnhookInstallRelease()
err = gohook.Hook(helm.InstallRelease, mockExecInstallReleaseCommon, nil)
if err != nil {
t.Fatal(err.Error())
}
err = engine.CreateDataMigrateJob(ctx, targetDataMigrate)
if err != nil {
t.Errorf("fail to exec the function: %v", err)
}
wrappedUnhookCheckRelease()
}
func TestJuiceFSEngine_generateDataMigrateValueFile(t *testing.T) {
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset-juicefs-values",
Namespace: "fluid",
},
Data: map[string]string{
"data": ``,
},
}
datasetInputs := []v1alpha1.Dataset{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "fluid",
},
Spec: v1alpha1.DatasetSpec{},
},
}
testObjs := []runtime.Object{}
testObjs = append(testObjs, configMap)
for _, datasetInput := range datasetInputs {
testObjs = append(testObjs, datasetInput.DeepCopy())
}
client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
context := cruntime.ReconcileRequestContext{
Client: client,
}
dataMigrateNoTarget := v1alpha1.DataMigrate{
ObjectMeta: metav1.ObjectMeta{
Name: "test-datamigrate",
Namespace: "fluid",
},
Spec: v1alpha1.DataMigrateSpec{
From: v1alpha1.DataToMigrate{
DataSet: &v1alpha1.DatasetToMigrate{
Name: "test-dataset",
Namespace: "fluid",
},
},
To: v1alpha1.DataToMigrate{
ExternalStorage: &v1alpha1.ExternalStorage{
URI: "minio://test/test",
},
},
},
}
dataMigrateWithTarget := v1alpha1.DataMigrate{
ObjectMeta: metav1.ObjectMeta{
Name: "test-datamigrate",
Namespace: "fluid",
},
Spec: v1alpha1.DataMigrateSpec{
From: v1alpha1.DataToMigrate{
DataSet: &v1alpha1.DatasetToMigrate{
Name: "test-dataset",
Namespace: "fluid",
Path: "/test/",
},
},
To: v1alpha1.DataToMigrate{
ExternalStorage: &v1alpha1.ExternalStorage{
URI: "minio://test/test",
},
},
Options: map[string]string{
"exclude": "4.png",
},
},
}
var testCases = []struct {
dataMigrate v1alpha1.DataMigrate
expectFileName string
}{
{
dataMigrate: dataMigrateNoTarget,
expectFileName: filepath.Join(os.TempDir(), "fluid-test-datamigrate-migrate-values.yaml"),
},
{
dataMigrate: dataMigrateWithTarget,
expectFileName: filepath.Join(os.TempDir(), "fluid-test-datamigrate-migrate-values.yaml"),
},
}
for _, test := range testCases {
engine := JuiceFSEngine{
name: "juicefs",
namespace: "fluid",
Client: client,
Log: fake.NullLogger(),
}
fileName, err := engine.generateDataMigrateValueFile(context, test.dataMigrate)
if err != nil {
t.Errorf("fail to generate the datamigrate value file: %v", err)
}
if !strings.Contains(fileName, test.expectFileName) {
t.Errorf("got value: %v, want value: %v", fileName, test.expectFileName)
}
}
}
func TestJuiceFSEngine_genDataUrl(t *testing.T) {
juicefsSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "juicefs-secret",
},
Data: map[string][]byte{
"access-key": []byte(base64.StdEncoding.EncodeToString([]byte("test"))),
"secret-key": []byte(base64.StdEncoding.EncodeToString([]byte("test"))),
"metaurl": []byte(base64.StdEncoding.EncodeToString([]byte("test"))),
},
}
juicefsConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-juicefs-values",
Namespace: "default",
},
Data: map[string]string{
"data": valuesConfigMapData,
},
}
testObjs := []runtime.Object{}
testObjs = append(testObjs, (*juicefsSecret).DeepCopy(), juicefsConfigMap)
client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
type args struct {
data v1alpha1.DataToMigrate
info *cdatamigrate.DataMigrateInfo
}
tests := []struct {
name string
args args
wantDataUrl string
wantErr bool
}{
{
name: "test-external",
args: args{
data: v1alpha1.DataToMigrate{ExternalStorage: &v1alpha1.ExternalStorage{
URI: "http://minio/",
EncryptOptions: []v1alpha1.EncryptOption{
{
Name: "access-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "access-key",
}},
},
{
Name: "secret-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "secret-key",
}},
},
{
Name: "token",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "token",
}},
},
},
}},
info: &cdatamigrate.DataMigrateInfo{
EncryptOptions: []v1alpha1.EncryptOption{
{
Name: "access-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "access-key",
}},
},
{
Name: "secret-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "secret-key",
}},
},
{
Name: "token",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "token",
}},
},
},
Options: map[string]string{},
},
},
wantDataUrl: "http://${ACCESS_KEY}:${SECRET_KEY}:${TOKEN}@minio/",
wantErr: false,
},
{
name: "test-external-subpath",
args: args{
data: v1alpha1.DataToMigrate{ExternalStorage: &v1alpha1.ExternalStorage{
URI: "http://minio/test/",
EncryptOptions: []v1alpha1.EncryptOption{{
Name: "access-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "access-key",
}},
}},
}},
info: &cdatamigrate.DataMigrateInfo{
EncryptOptions: []v1alpha1.EncryptOption{{
Name: "access-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "access-key",
}},
}},
Options: map[string]string{},
},
},
wantDataUrl: "http://${ACCESS_KEY}:@minio/test/",
wantErr: false,
},
{
name: "test-external-subpath-file",
args: args{
data: v1alpha1.DataToMigrate{ExternalStorage: &v1alpha1.ExternalStorage{
URI: "http://minio/test",
EncryptOptions: []v1alpha1.EncryptOption{{
Name: "access-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "access-key",
}},
}},
}},
info: &cdatamigrate.DataMigrateInfo{
EncryptOptions: []v1alpha1.EncryptOption{{
Name: "access-key",
ValueFrom: v1alpha1.EncryptOptionSource{SecretKeyRef: v1alpha1.SecretKeySelector{
Name: "juicefs-secret",
Key: "access-key",
}},
}},
Options: map[string]string{},
},
},
wantDataUrl: "http://${ACCESS_KEY}:@minio/test",
wantErr: false,
},
{
name: "test-dataset",
args: args{
data: v1alpha1.DataToMigrate{
DataSet: &v1alpha1.DatasetToMigrate{
Name: "test",
Namespace: "default",
Path: "/subpath/",
},
},
info: &cdatamigrate.DataMigrateInfo{
EncryptOptions: []v1alpha1.EncryptOption{},
Options: map[string]string{},
},
},
wantDataUrl: "jfs://FLUID_METAURL/subpath/",
wantErr: false,
},
{
name: "test-dataset-no-path",
args: args{
data: v1alpha1.DataToMigrate{
DataSet: &v1alpha1.DatasetToMigrate{
Name: "test",
Namespace: "default",
},
},
info: &cdatamigrate.DataMigrateInfo{
EncryptOptions: []v1alpha1.EncryptOption{},
Options: map[string]string{},
},
},
wantDataUrl: "jfs://FLUID_METAURL/",
wantErr: false,
},
{
name: "test-dataset-subpath-file",
args: args{
data: v1alpha1.DataToMigrate{
DataSet: &v1alpha1.DatasetToMigrate{
Name: "test",
Namespace: "default",
Path: "/subpath",
},
},
info: &cdatamigrate.DataMigrateInfo{
EncryptOptions: []v1alpha1.EncryptOption{},
Options: map[string]string{},
},
},
wantDataUrl: "jfs://FLUID_METAURL/subpath",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &JuiceFSEngine{
Client: client,
Log: fake.NullLogger(),
}
gotDataUrl, err := j.genDataUrl(tt.args.data, tt.args.info)
if (err != nil) != tt.wantErr {
t.Errorf("genDataUrl() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotDataUrl != tt.wantDataUrl {
t.Errorf("genDataUrl() gotDataUrl = %v, want %v", gotDataUrl, tt.wantDataUrl)
}
})
}
}

View File

@ -57,3 +57,28 @@ func parseCacheInfoFromConfigMap(configMap *v1.ConfigMap) (cacheinfo map[string]
}
return configmapinfo, nil
}
func GetFSInfoFromConfigMap(client client.Client, name string, namespace string) (info map[string]string, err error) {
configMapName := fmt.Sprintf("%s-juicefs-values", name)
configMap, err := kubeclient.GetConfigmapByName(client, configMapName, namespace)
if err != nil {
return nil, errors.Wrap(err, "GetConfigMapByName error when GetCacheInfoFromConfigmap")
}
return parseFSInfoFromConfigMap(configMap)
}
func parseFSInfoFromConfigMap(configMap *v1.ConfigMap) (info map[string]string, err error) {
var value JuiceFS
info = map[string]string{}
if v, ok := configMap.Data["data"]; ok {
if err = yaml.Unmarshal([]byte(v), &value); err != nil {
return
}
info[MetaurlSecret] = value.Configs.MetaUrlSecret
info[MetaurlSecretKey] = value.Configs.MetaUrlSecretKey
info[Name] = value.Configs.Name
info[Edition] = value.Edition
return
}
return
}

View File

@ -20,11 +20,12 @@ import (
"reflect"
"testing"
"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
)
func TestGetCacheInfoFromConfigmap(t *testing.T) {
@ -56,7 +57,7 @@ func TestGetCacheInfoFromConfigmap(t *testing.T) {
runtimeObjs = append(runtimeObjs, configMap)
runtimeObjs = append(runtimeObjs, dataSet.DeepCopy())
fakeClient := fake.NewFakeClientWithScheme(testScheme, runtimeObjs...)
wantCacheInfo := map[string]string{"mountpath": "/runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse", "edition": "community"}
wantCacheInfo := map[string]string{"mountpath": "/runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse", "edition": "community"}
cacheinfo, err := GetCacheInfoFromConfigmap(fakeClient, dataSet.Name, dataSet.Namespace)
if err != nil {
t.Errorf("GetCacheInfoFromConfigmap failed.")
@ -84,7 +85,7 @@ func Test_parseCacheInfoFromConfigMap(t *testing.T) {
"data": valuesConfigMapData,
},
}},
wantCacheInfo: map[string]string{"mountpath": "/runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse", "edition": "community"},
wantCacheInfo: map[string]string{"mountpath": "/runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse", "edition": "community"},
wantErr: false,
},
{
@ -111,3 +112,96 @@ func Test_parseCacheInfoFromConfigMap(t *testing.T) {
})
}
}
func TestGetFSInfoFromConfigMap(t *testing.T) {
configMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset-juicefs-values",
Namespace: "fluid",
},
Data: map[string]string{
"data": valuesConfigMapData,
},
}
dataSet := &v1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "fluid",
},
Status: v1alpha1.DatasetStatus{
Runtimes: []v1alpha1.Runtime{
{
Name: "test-dataset",
Namespace: "fluid",
Type: "juicefs",
},
},
},
}
runtimeObjs := []runtime.Object{}
runtimeObjs = append(runtimeObjs, configMap)
runtimeObjs = append(runtimeObjs, dataSet.DeepCopy())
fakeClient := fake.NewFakeClientWithScheme(testScheme, runtimeObjs...)
wantMetaurlInfo := map[string]string{MetaurlSecret: "jfs-secret", MetaurlSecretKey: "metaurl", Name: "minio", Edition: "community"}
metaurlInfo, err := GetFSInfoFromConfigMap(fakeClient, dataSet.Name, dataSet.Namespace)
if err != nil {
t.Errorf("GetMetaUrlInfoFromConfigMap failed.")
}
if !reflect.DeepEqual(metaurlInfo, wantMetaurlInfo) {
t.Errorf("parseCacheInfoFromConfigMap() gotMetaurlInfo = %v, want %v", metaurlInfo, wantMetaurlInfo)
}
}
func Test_parseFSInfoFromConfigMap(t *testing.T) {
type args struct {
configMap *v1.ConfigMap
}
tests := []struct {
name string
args args
wantMetaurlInfo map[string]string
wantErr bool
}{
{
name: "test",
args: args{
configMap: &v1.ConfigMap{
Data: map[string]string{
"data": valuesConfigMapData,
},
},
},
wantMetaurlInfo: map[string]string{
MetaurlSecret: "jfs-secret",
MetaurlSecretKey: "metaurl",
Name: "minio",
Edition: CommunityEdition,
},
wantErr: false,
},
{
name: "test-err",
args: args{
configMap: &v1.ConfigMap{
Data: map[string]string{
"data": "test",
},
},
},
wantMetaurlInfo: map[string]string{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotMetaurlInfo, err := parseFSInfoFromConfigMap(tt.args.configMap)
if (err != nil) != tt.wantErr {
t.Errorf("parseFSInfoFromConfigMap() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotMetaurlInfo, tt.wantMetaurlInfo) {
t.Errorf("parseFSInfoFromConfigMap() gotMetaurlInfo = %v, want %v", gotMetaurlInfo, tt.wantMetaurlInfo)
}
})
}
}

View File

@ -19,9 +19,10 @@ package juicefs
import (
"fmt"
"k8s.io/apimachinery/pkg/types"
"github.com/fluid-cloudnative/fluid/pkg/ctrl"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"

View File

@ -0,0 +1,29 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package thin
import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (t *ThinEngine) CreateDataMigrateJob(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) error {
//TODO implement me
return fmt.Errorf("not implemented yet")
}

View File

@ -19,10 +19,11 @@ package thin
import (
"fmt"
"github.com/fluid-cloudnative/fluid/pkg/ddc/thin/referencedataset"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/fluid-cloudnative/fluid/pkg/ddc/thin/referencedataset"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

View File

@ -0,0 +1,27 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package referencedataset
import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
)
func (e *ReferenceDatasetEngine) MigrateData(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (err error) {
//TODO implement me
return nil
}

View File

@ -21,12 +21,13 @@ import (
"fmt"
"time"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/go-logr/logr"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
)
const (

127
pkg/utils/datamigrate.go Normal file
View File

@ -0,0 +1,127 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"context"
"fmt"
batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
)
// GetDataMigrate gets the DataMigrate given its name and namespace
func GetDataMigrate(client client.Client, name, namespace string) (*datav1alpha1.DataMigrate, error) {
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
var dataMigrate datav1alpha1.DataMigrate
if err := client.Get(context.TODO(), key, &dataMigrate); err != nil {
return nil, err
}
return &dataMigrate, nil
}
// GetDataMigrateJob gets the DataMigrate job given its name and namespace
func GetDataMigrateJob(client client.Client, name, namespace string) (*batchv1.Job, error) {
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
var job batchv1.Job
if err := client.Get(context.TODO(), key, &job); err != nil {
return nil, err
}
return &job, nil
}
// GetDataMigrateReleaseName returns DataMigrate helm release's name given the DataMigrate's name
func GetDataMigrateReleaseName(name string) string {
return fmt.Sprintf("%s-migrate", name)
}
// GetDataMigrateJobName returns DataMigrate job's name given the DataMigrate helm release's name
func GetDataMigrateJobName(releaseName string) string {
return fmt.Sprintf("%s-migrate", releaseName)
}
// GetDataMigrateRef returns the identity of the DataMigrate by combining its namespace and name.
// The identity is used for identifying current lock holder on the target dataset.
func GetDataMigrateRef(name, namespace string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
func GetTargetDatasetOfMigrate(client client.Client, dataMigrate datav1alpha1.DataMigrate) (targetDataset *datav1alpha1.Dataset, err error) {
var fromDataset, toDataset *datav1alpha1.Dataset
if dataMigrate.Spec.To.DataSet != nil && dataMigrate.Spec.To.DataSet.Name != "" {
toDataset, err = GetDataset(client, dataMigrate.Spec.To.DataSet.Name, dataMigrate.Spec.To.DataSet.Namespace)
if err != nil {
return
}
// if runtimeType is not specified, we will use the toDataset as the targetDataset
if dataMigrate.Spec.RuntimeType == "" {
targetDataset = toDataset
return
}
// if runtimeType is specified, check if toDataset's accelerate runtime type is the same as the runtimeType
index, boundedRuntime := GetRuntimeByCategory(toDataset.Status.Runtimes, common.AccelerateCategory)
if index == -1 {
err = fmt.Errorf("bounded accelerate runtime not ready")
return
}
if boundedRuntime.Type == dataMigrate.Spec.RuntimeType {
targetDataset = toDataset
return
}
}
if dataMigrate.Spec.From.DataSet != nil && dataMigrate.Spec.From.DataSet.Name != "" {
fromDataset, err = GetDataset(client, dataMigrate.Spec.From.DataSet.Name, dataMigrate.Spec.From.DataSet.Namespace)
if err != nil {
return
}
// if runtimeType is not specified, we will use the fromDataset as the targetDataset
if dataMigrate.Spec.RuntimeType == "" {
targetDataset = fromDataset
return
}
// if runtimeType is specified, check if fromDataset's accelerate runtime type is the same as the runtimeType
index, boundedRuntime := GetRuntimeByCategory(fromDataset.Status.Runtimes, common.AccelerateCategory)
if index == -1 {
err = fmt.Errorf("bounded accelerate runtime not ready")
return
}
if boundedRuntime.Type == dataMigrate.Spec.RuntimeType {
targetDataset = fromDataset
return
}
}
return nil, apierrors.NewNotFound(schema.GroupResource{
Group: datav1alpha1.Group,
Resource: datav1alpha1.Version,
}, "dataset")
}

View File

@ -0,0 +1,301 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"testing"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
)
func TestGetDataMigrate(t *testing.T) {
mockDataMigrateName := "fluid-test-data-migrate"
mockDataMigrateNamespace := "default"
initDataMigrate := &datav1alpha1.DataMigrate{
ObjectMeta: metav1.ObjectMeta{
Name: mockDataMigrateName,
Namespace: mockDataMigrateNamespace,
},
}
s := runtime.NewScheme()
s.AddKnownTypes(datav1alpha1.GroupVersion, initDataMigrate)
fakeClient := fake.NewFakeClientWithScheme(s, initDataMigrate)
testCases := map[string]struct {
name string
namespace string
wantName string
notFound bool
}{
"test get DataMigrate case 1": {
name: mockDataMigrateName,
namespace: mockDataMigrateNamespace,
wantName: mockDataMigrateName,
notFound: false,
},
"test get DataMigrate case 2": {
name: mockDataMigrateName + "not-exist",
namespace: mockDataMigrateNamespace,
wantName: "",
notFound: true,
},
}
for k, item := range testCases {
gotDataMigrate, err := GetDataMigrate(fakeClient, item.name, item.namespace)
if item.notFound {
if err == nil && gotDataMigrate != nil {
t.Errorf("%s check failure, want get err, but get nil", k)
}
} else {
if gotDataMigrate.Name != item.wantName {
t.Errorf("%s check failure, want DataMigrate name:%s, got DataMigrate name:%s", k, item.wantName, gotDataMigrate.Name)
}
}
}
}
func TestGetDataMigrateJob(t *testing.T) {
mockJobName := "fluid-test-job"
mockJobNamespace := "default"
initJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: mockJobName,
Namespace: mockJobNamespace,
},
}
fakeClient := fake.NewFakeClient(initJob)
testCases := map[string]struct {
name string
namespace string
wantName string
notFound bool
}{
"test get DataMigrate Job case 1": {
name: mockJobName,
namespace: mockJobNamespace,
wantName: mockJobName,
notFound: false,
},
"test get DataMigrate Job case 2": {
name: mockJobName + "not-exist",
namespace: mockJobNamespace,
wantName: "",
notFound: true,
},
}
for k, item := range testCases {
gotJob, err := GetDataMigrateJob(fakeClient, item.name, item.namespace)
if item.notFound {
if err == nil && gotJob != nil {
t.Errorf("%s check failure, want get err, but get nil", k)
}
} else {
if gotJob.Name != item.wantName {
t.Errorf("%s check failure, want DataMigrate Job name:%s, got DataMigrate Job name:%s", k, item.wantName, gotJob.Name)
}
}
}
}
func TestGetDataMigrateJobName(t *testing.T) {
type args struct {
releaseName string
}
tests := []struct {
name string
args args
want string
}{
{
name: "test",
args: args{
releaseName: "test",
},
want: "test-migrate",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetDataMigrateJobName(tt.args.releaseName); got != tt.want {
t.Errorf("GetDataMigrateJobName() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetDataMigrateRef(t *testing.T) {
type args struct {
name string
namespace string
}
tests := []struct {
name string
args args
want string
}{
{
name: "test",
args: args{
name: "test",
namespace: "default",
},
want: "default/test",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetDataMigrateRef(tt.args.name, tt.args.namespace); got != tt.want {
t.Errorf("GetDataMigrateRef() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetDataMigrateReleaseName(t *testing.T) {
type args struct {
name string
}
tests := []struct {
name string
args args
want string
}{
{
name: "test",
args: args{
name: "test",
},
want: "test-migrate",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetDataMigrateReleaseName(tt.args.name); got != tt.want {
t.Errorf("GetDataMigrateReleaseName() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetTargetDatasetOfMigrate(t *testing.T) {
dataSet := &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "fluid",
},
}
runtimeObjs := []runtime.Object{}
runtimeObjs = append(runtimeObjs, dataSet.DeepCopy())
fakeClient := fake.NewFakeClientWithScheme(testScheme, runtimeObjs...)
type args struct {
dataMigrate datav1alpha1.DataMigrate
}
tests := []struct {
name string
args args
wantDataset *datav1alpha1.Dataset
wantErr bool
}{
{
name: "test-from",
args: args{
dataMigrate: datav1alpha1.DataMigrate{
Spec: datav1alpha1.DataMigrateSpec{
From: datav1alpha1.DataToMigrate{
DataSet: &datav1alpha1.DatasetToMigrate{
Name: "test-dataset",
Namespace: "fluid",
},
},
},
},
},
wantDataset: &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "fluid",
},
},
wantErr: false,
},
{
name: "test-to",
args: args{
dataMigrate: datav1alpha1.DataMigrate{
Spec: datav1alpha1.DataMigrateSpec{
To: datav1alpha1.DataToMigrate{
DataSet: &datav1alpha1.DatasetToMigrate{
Name: "test-dataset",
Namespace: "fluid",
},
},
},
},
},
wantDataset: &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dataset",
Namespace: "fluid",
},
},
wantErr: false,
},
{
name: "test-not-exist",
args: args{
dataMigrate: datav1alpha1.DataMigrate{
Spec: datav1alpha1.DataMigrateSpec{
To: datav1alpha1.DataToMigrate{
DataSet: &datav1alpha1.DatasetToMigrate{
Name: "not-exist-dataset",
Namespace: "fluid",
},
},
},
},
},
wantDataset: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotDataset, err := GetTargetDatasetOfMigrate(fakeClient, tt.args.dataMigrate)
if (err != nil) != tt.wantErr {
t.Errorf("GetTargetDatasetOfMigrate() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotDataset != nil || tt.wantDataset != nil {
if gotDataset.Name != tt.wantDataset.Name || gotDataset.Namespace != tt.wantDataset.Namespace {
t.Errorf("GetTargetDatasetOfMigrate() gotDataset = %v, want %v", gotDataset, tt.wantDataset)
}
}
})
}
}

View File

@ -5,12 +5,14 @@ import (
"reflect"
"testing"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
)
var (
@ -20,6 +22,7 @@ var (
func init() {
testScheme = runtime.NewScheme()
_ = v1.AddToScheme(testScheme)
_ = datav1alpha1.AddToScheme(testScheme)
}
func TestChangeNodeLabelWithUpdateModel(t *testing.T) {

View File

@ -120,6 +120,62 @@ def create_dataset_and_runtime(dataset_name):
print("Create juicefs runtime {}".format(dataset_name))
def create_datamigrate(datamigrate_name, dataset_name):
api = client.CustomObjectsApi()
my_datamigrate = {
"apiVersion": "data.fluid.io/v1alpha1",
"kind": "DataMigrate",
"metadata": {"name": datamigrate_name, "namespace": APP_NAMESPACE},
"spec": {
"image": "registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse",
"imageTag": "nightly",
"from": {
"dataset": {"name": dataset_name, "namespace": APP_NAMESPACE}
},
"to": {"externalStorage": {
"uri": "minio://%s:9000/minio/test/" % NODE_IP,
"encryptOptions": [
{"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "accesskey"}}},
{"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secretkey"}}},
]
}}
},
}
api.create_namespaced_custom_object(
group="data.fluid.io",
version="v1alpha1",
namespace="default",
plural="datamigrates",
body=my_datamigrate,
)
print("Create datamigrate {}".format(datamigrate_name))
def check_datamigrate_complete(datamigrate_name):
api = client.CustomObjectsApi()
count = 0
while count < 300:
resource = api.get_namespaced_custom_object(
group="data.fluid.io",
version="v1alpha1",
name=datamigrate_name,
namespace=APP_NAMESPACE,
plural="datamigrates"
)
if "status" in resource:
if "phase" in resource["status"]:
if resource["status"]["phase"] == "Complete":
print("Datamigrate {} is complete.".format(datamigrate_name))
return True
time.sleep(1)
count += 1
print("Datamigrate {} is not complete within 300s.".format(datamigrate_name))
return False
def get_worker_node(dataset_name):
api = client.CoreV1Api()
pod_name = "{}-worker-0".format(dataset_name)
@ -387,6 +443,18 @@ def clean_up_dataset_and_runtime(dataset_name):
return False
def clean_up_datamigrate(datamigrate_name):
custom_api = client.CustomObjectsApi()
custom_api.delete_namespaced_custom_object(
group="data.fluid.io",
version="v1alpha1",
name=datamigrate_name,
namespace=APP_NAMESPACE,
plural="datamigrates"
)
print("Datamigrate {} deleted".format(datamigrate_name))
def clean_up_secret():
core_api = client.CoreV1Api()
core_api.delete_namespaced_secret(name=SECRET_NAME, namespace=APP_NAMESPACE)
@ -400,6 +468,7 @@ def main():
# ------- test normal mode -------
# ****************************************************************
dataset_name = "jfsdemo"
datamigrate_name = "jfsdemo"
test_write_job = "demo-write"
test_read_job = "demo-read"
try:
@ -422,18 +491,33 @@ def main():
create_data_read_job(dataset_name, test_read_job)
if not check_data_job_status(test_read_job):
raise Exception("read job {} in normal mode failed.".format(test_read_job))
# ****************************************************************
# ------- test data migrate -------
# ****************************************************************
# 1. create datamigrate
create_datamigrate(datamigrate_name, dataset_name)
# 2. check datamigrate status
if not check_datamigrate_complete(datamigrate_name):
raise Exception("datamigrate {} failed.".format(datamigrate_name))
except Exception as e:
print(e)
exit(-1)
finally:
# 4. clean up write & read data job
# clear
# 1. clean up write & read data job
clean_job(test_write_job)
clean_job(test_read_job)
# 5. clean up dataset and runtime
# 2. clean up datamigrate
clean_up_datamigrate(datamigrate_name)
# 3. clean up dataset and runtime
clean_up_dataset_and_runtime(dataset_name)
# 6. clean up secret
# 4. clean up secret
clean_up_secret()
# ****************************************************************
@ -452,7 +536,7 @@ def main():
# ****************************************************************
# ------- test sidecar mode -------
# ********************************
# ****************************************************************
dataset_name = "jfsdemo-sidecar"
test_write_job = "demo-write-sidecar"
test_read_job = "demo-read-sidecar"