Cherry-pick to 0.9: fix csi plugin concurrency issue on FuseRecovery and NodeUnpublishVolume (#3448) (#3453)
* Bugfix: ignore not connected error in NodeUnpublishVolume (#3445) * ignore not connected error in NodeUnpublishVolume Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn> * fix check nil error Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn> * simplify error judgment Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn> --------- Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn> * bugfix: fix csi plugin concurrency issue on FuseRecovery and NodeUnpublishVolume (#3448) * Add comments for NodeUnpublishVolume Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Refactor NodeUnpublishVolume code Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * FuseRecovery uses volume locks to avoid race conditions Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Refactor node server with codes.Internal error code Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Rename CSI Config to RunningContext Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Fix github actions checks Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Fix lock release Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Refactor recover logic Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> --------- Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> --------- Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn> Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> Co-authored-by: wangshulin <89928606+wangshli@users.noreply.github.com>
This commit is contained in:
parent
6acbbcbb75
commit
7b01a4c59f
|
@ -30,6 +30,7 @@ import (
|
|||
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
|
||||
"github.com/fluid-cloudnative/fluid/pkg/csi"
|
||||
"github.com/fluid-cloudnative/fluid/pkg/csi/config"
|
||||
"github.com/fluid-cloudnative/fluid/pkg/utils"
|
||||
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -110,15 +111,17 @@ func handle() {
|
|||
panic(fmt.Sprintf("csi: unable to create controller manager due to error %v", err))
|
||||
}
|
||||
|
||||
config := config.Config{
|
||||
NodeId: nodeID,
|
||||
Endpoint: endpoint,
|
||||
PruneFs: pruneFs,
|
||||
PrunePath: prunePath,
|
||||
KubeletConfigPath: kubeletKubeConfigPath,
|
||||
runningContext := config.RunningContext{
|
||||
Config: config.Config{
|
||||
NodeId: nodeID,
|
||||
Endpoint: endpoint,
|
||||
PruneFs: pruneFs,
|
||||
PrunePath: prunePath,
|
||||
KubeletConfigPath: kubeletKubeConfigPath,
|
||||
},
|
||||
VolumeLocks: utils.NewVolumeLocks(),
|
||||
}
|
||||
|
||||
if err = csi.SetupWithManager(mgr, config); err != nil {
|
||||
if err = csi.SetupWithManager(mgr, runningContext); err != nil {
|
||||
panic(fmt.Sprintf("unable to set up manager due to error %v", err))
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,13 @@ limitations under the License.
|
|||
|
||||
package config
|
||||
|
||||
import "github.com/fluid-cloudnative/fluid/pkg/utils"
|
||||
|
||||
type RunningContext struct {
|
||||
Config
|
||||
VolumeLocks *utils.VolumeLocks
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
NodeId string
|
||||
Endpoint string
|
||||
|
|
|
@ -44,11 +44,13 @@ type driver struct {
|
|||
nodeAuthorizedClient *kubernetes.Clientset
|
||||
csiDriver *csicommon.CSIDriver
|
||||
nodeId, endpoint string
|
||||
|
||||
locks *utils.VolumeLocks
|
||||
}
|
||||
|
||||
var _ manager.Runnable = &driver{}
|
||||
|
||||
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset) *driver {
|
||||
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset, locks *utils.VolumeLocks) *driver {
|
||||
glog.Infof("Driver: %v version: %v", driverName, version)
|
||||
|
||||
proto, addr := utils.SplitSchemaAddr(endpoint)
|
||||
|
@ -76,6 +78,7 @@ func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.R
|
|||
client: client,
|
||||
nodeAuthorizedClient: nodeAuthorizedClient,
|
||||
apiReader: apiReader,
|
||||
locks: locks,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,7 +95,7 @@ func (d *driver) newNodeServer() *nodeServer {
|
|||
client: d.client,
|
||||
apiReader: d.apiReader,
|
||||
nodeAuthorizedClient: d.nodeAuthorizedClient,
|
||||
locks: utils.NewVolumeLocks(),
|
||||
locks: d.locks,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
if err := os.MkdirAll(targetPath, 0750); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
} else {
|
||||
glog.Infof("MkdirAll successful. %v", targetPath)
|
||||
glog.Infof("NodePublishVolume: MkdirAll successful on %v", targetPath)
|
||||
}
|
||||
//isMount = true
|
||||
} else {
|
||||
|
@ -91,21 +91,20 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
|
||||
if isMount {
|
||||
glog.Infof("It's already mounted to %v", targetPath)
|
||||
glog.Infof("NodePublishVolume: already mounted to %v, do nothing", targetPath)
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
} else {
|
||||
glog.Infof("Try to mount to %v", targetPath)
|
||||
}
|
||||
|
||||
glog.Infof("NodePublishVolume: start mounting staging path to %v", targetPath)
|
||||
// 0. check if read only
|
||||
readOnly := false
|
||||
if req.GetVolumeCapability() == nil {
|
||||
glog.Infoln("Volume Capability is nil")
|
||||
glog.Infoln("NodePublishVolume: found volume capability is nil")
|
||||
} else {
|
||||
mode := req.GetVolumeCapability().GetAccessMode().GetMode()
|
||||
if mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
|
||||
readOnly = true
|
||||
glog.Infof("Set the mount option readonly=%v", readOnly)
|
||||
glog.Infof("NodePublishVolume: set the mount option readonly=%v", readOnly)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,7 +154,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
command := exec.Command("mount", args...)
|
||||
|
||||
glog.V(4).Infoln(command)
|
||||
glog.V(3).Infof("NodePublishVolume: exec command %v", command)
|
||||
stdoutStderr, err := command.CombinedOutput()
|
||||
glog.V(4).Infoln(string(stdoutStderr))
|
||||
if err != nil {
|
||||
|
@ -167,12 +166,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
} else {
|
||||
glog.V(4).Infof("Succeed in binding %s to %s", mountPath, targetPath)
|
||||
glog.V(3).Infof("NodePublishVolume: succeed in binding %s to %s", mountPath, targetPath)
|
||||
}
|
||||
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodeUnpublishVolume umounts every mounted file systems on the given req.GetTargetPath() until it's cleaned up.
|
||||
// If anything unexpected happened during the umount process, it returns error and wait for retries.
|
||||
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||
targetPath := req.GetTargetPath()
|
||||
// check targetpath validity
|
||||
|
@ -180,52 +181,55 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume operation requires targetPath but is not provided")
|
||||
}
|
||||
|
||||
// The lock is to avoid race condition
|
||||
// The lock is to avoid race condition, make sure only one goroutine(including the FUSE Recovery goroutine) is handling the targetPath
|
||||
if lock := ns.locks.TryAcquire(targetPath); !lock {
|
||||
return nil, status.Errorf(codes.Aborted, "NodeUnpublishVolume operation on targetPath %s already exists", targetPath)
|
||||
}
|
||||
defer ns.locks.Release(targetPath)
|
||||
|
||||
// check path existence
|
||||
_, err := os.Stat(targetPath)
|
||||
// No need to unmount non-existing targetPath
|
||||
if os.IsNotExist(err) {
|
||||
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
|
||||
}
|
||||
|
||||
// targetPath may be mount bind many times when mount point recovered.
|
||||
// targetPath may be bind mount many times when mount point recovered.
|
||||
// umount until it's not mounted.
|
||||
mounter := mount.New("")
|
||||
for {
|
||||
notMount, err := mounter.IsLikelyNotMountPoint(targetPath)
|
||||
if os.IsNotExist(err) {
|
||||
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(3).Infoln(err)
|
||||
if corrupted := mount.IsCorruptedMnt(err); !corrupted {
|
||||
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
|
||||
if !mount.IsCorruptedMnt(err) {
|
||||
// stat targetPath with unexpected error
|
||||
glog.Errorf("NodeUnpublishVolume: stat targetPath %s with error: %v", targetPath, err)
|
||||
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: stat targetPath %s: %v", targetPath, err)
|
||||
} else {
|
||||
// targetPath is corrupted
|
||||
glog.V(3).Infof("NodeUnpublishVolume: detected corrupted mountpoint on path %s with error %v", targetPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
if notMount {
|
||||
glog.V(3).Infof("umount:%s success", targetPath)
|
||||
glog.V(3).Infof("NodeUnpublishVolume: umount %s success", targetPath)
|
||||
break
|
||||
}
|
||||
|
||||
glog.V(3).Infof("umount:%s", targetPath)
|
||||
glog.V(3).Infof("NodeUnpublishVolume: exec umount %s", targetPath)
|
||||
err = mounter.Unmount(targetPath)
|
||||
if os.IsNotExist(err) {
|
||||
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up when umounting it", targetPath)
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(3).Infoln(err)
|
||||
return nil, errors.Wrapf(err, "NodeUnpublishVolume: umount targetPath %s error %v", targetPath, err)
|
||||
glog.Errorf("NodeUnpublishVolume: umount targetPath %s with error: %v", targetPath, err)
|
||||
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: umount targetPath %s: %v", targetPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
err = mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
|
||||
err := mount.CleanupMountPoint(targetPath, mounter, false)
|
||||
if err != nil {
|
||||
glog.V(3).Infoln(err)
|
||||
glog.Errorf("NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
|
||||
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
|
||||
} else {
|
||||
glog.V(4).Infof("Succeed in umounting %s", targetPath)
|
||||
glog.V(4).Infof("NodeUnpublishVolume: succeed in umounting %s", targetPath)
|
||||
}
|
||||
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
|
@ -271,14 +275,14 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
|||
|
||||
var shouldCleanFuse bool
|
||||
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
|
||||
glog.Infof("Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
|
||||
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
|
||||
switch cleanPolicy {
|
||||
case v1alpha1.OnDemandCleanPolicy:
|
||||
shouldCleanFuse = true
|
||||
case v1alpha1.OnRuntimeDeletedCleanPolicy:
|
||||
shouldCleanFuse = false
|
||||
default:
|
||||
return nil, errors.Errorf("Unknown Fuse clean policy: %s", cleanPolicy)
|
||||
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
|
||||
}
|
||||
|
||||
if !shouldCleanFuse {
|
||||
|
@ -343,7 +347,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||
// 2. clean up broken mount point
|
||||
fluidPath := req.GetVolumeContext()[common.VolumeAttrFluidPath]
|
||||
if ignoredErr := cleanUpBrokenMountPoint(fluidPath); ignoredErr != nil {
|
||||
glog.Warningf("Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
|
||||
glog.Warningf("NodeStageVolume: Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
|
||||
}
|
||||
|
||||
// 3. get runtime namespace and name
|
||||
|
|
|
@ -23,13 +23,13 @@ import (
|
|||
)
|
||||
|
||||
// Register initializes the csi driver and registers it to the controller manager.
|
||||
func Register(mgr manager.Manager, cfg config.Config) error {
|
||||
client, err := kubelet.InitNodeAuthorizedClient(cfg.KubeletConfigPath)
|
||||
func Register(mgr manager.Manager, ctx config.RunningContext) error {
|
||||
client, err := kubelet.InitNodeAuthorizedClient(ctx.KubeletConfigPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client)
|
||||
csiDriver := NewDriver(ctx.NodeId, ctx.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client, ctx.VolumeLocks)
|
||||
|
||||
if err := mgr.Add(csiDriver); err != nil {
|
||||
return err
|
||||
|
|
|
@ -60,6 +60,8 @@ type FuseRecover struct {
|
|||
|
||||
recoverFusePeriod time.Duration
|
||||
recoverWarningThreshold int
|
||||
|
||||
locks *utils.VolumeLocks
|
||||
}
|
||||
|
||||
func initializeKubeletClient() (*kubelet.KubeletClient, error) {
|
||||
|
@ -102,7 +104,7 @@ func initializeKubeletClient() (*kubelet.KubeletClient, error) {
|
|||
return kubeletClient, nil
|
||||
}
|
||||
|
||||
func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, apiReader client.Reader) (*FuseRecover, error) {
|
||||
func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, apiReader client.Reader, locks *utils.VolumeLocks) (*FuseRecover, error) {
|
||||
glog.V(3).Infoln("start csi recover")
|
||||
mountRoot, err := utils.GetMountRoot()
|
||||
if err != nil {
|
||||
|
@ -129,6 +131,7 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api
|
|||
Recorder: recorder,
|
||||
recoverFusePeriod: recoverFusePeriod,
|
||||
recoverWarningThreshold: recoverWarningThreshold,
|
||||
locks: locks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -151,7 +154,7 @@ func (r *FuseRecover) runOnce() {
|
|||
r.recover()
|
||||
}
|
||||
|
||||
func (r FuseRecover) recover() {
|
||||
func (r *FuseRecover) recover() {
|
||||
brokenMounts, err := mountinfo.GetBrokenMountPoints()
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
|
@ -159,34 +162,20 @@ func (r FuseRecover) recover() {
|
|||
}
|
||||
|
||||
for _, point := range brokenMounts {
|
||||
glog.V(4).Infof("Get broken mount point: %v", point)
|
||||
// if app container restart, umount duplicate mount may lead to recover successed but can not access data
|
||||
// so we only umountDuplicate when it has mounted more than the recoverWarningThreshold
|
||||
// please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information
|
||||
if point.Count > r.recoverWarningThreshold {
|
||||
glog.Warningf("Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potential make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold)
|
||||
r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate)
|
||||
r.umountDuplicate(point)
|
||||
}
|
||||
if err := r.recoverBrokenMount(point); err != nil {
|
||||
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
|
||||
continue
|
||||
}
|
||||
r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed)
|
||||
r.doRecover(point)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *FuseRecover) recoverBrokenMount(point mountinfo.MountPoint) (err error) {
|
||||
glog.V(3).Infof("Start recovery: [%s], source path: [%s]", point.MountPath, point.SourcePath)
|
||||
// recovery for each bind mount path
|
||||
mountOption := []string{"bind"}
|
||||
if point.ReadOnly {
|
||||
mountOption = append(mountOption, "ro")
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Start exec cmd: mount %s %s -o %v \n", point.SourcePath, point.MountPath, mountOption)
|
||||
glog.V(3).Infof("FuseRecovery: Start exec cmd: mount %s %s -o %v \n", point.SourcePath, point.MountPath, mountOption)
|
||||
if err := r.Mount(point.SourcePath, point.MountPath, "none", mountOption); err != nil {
|
||||
glog.Errorf("exec cmd: mount -o bind %s %s err :%v", point.SourcePath, point.MountPath, err)
|
||||
glog.Errorf("FuseRecovery: exec cmd: mount -o bind %s %s with err :%v", point.SourcePath, point.MountPath, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -196,9 +185,9 @@ func (r *FuseRecover) recoverBrokenMount(point mountinfo.MountPoint) (err error)
|
|||
// don't umount all item, 'mountPropagation' will lose efficacy.
|
||||
func (r *FuseRecover) umountDuplicate(point mountinfo.MountPoint) {
|
||||
for i := point.Count; i > 1; i-- {
|
||||
glog.V(3).Infof("count: %d, start exec cmd: umount %s", i, point.MountPath)
|
||||
glog.V(3).Infof("FuseRecovery: count: %d, start exec cmd: umount %s", i, point.MountPath)
|
||||
if err := r.Unmount(point.MountPath); err != nil {
|
||||
glog.Errorf("exec cmd: umount %s err: %v", point.MountPath, err)
|
||||
glog.Errorf("FuseRecovery: exec cmd: umount %s with err: %v", point.MountPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -231,3 +220,52 @@ func (r *FuseRecover) eventRecord(point mountinfo.MountPoint, eventType, eventRe
|
|||
r.Recorder.Eventf(dataset, eventType, eventReason, "Mountpoint %s has been mounted %v times, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potential make data access connection broken", point.MountPath, point.Count)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *FuseRecover) shouldRecover(mountPath string) (should bool, err error) {
|
||||
mounter := mount.New("")
|
||||
notMount, err := mounter.IsLikelyNotMountPoint(mountPath)
|
||||
if os.IsNotExist(err) || (err == nil && notMount) {
|
||||
// Perhaps the mountPath has been cleaned up in other goroutine
|
||||
return false, nil
|
||||
}
|
||||
if err != nil && !mount.IsCorruptedMnt(err) {
|
||||
// unexpected error
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *FuseRecover) doRecover(point mountinfo.MountPoint) {
|
||||
if lock := r.locks.TryAcquire(point.MountPath); !lock {
|
||||
glog.V(4).Infof("FuseRecovery: fail to acquire lock on path %s, skip recovering it", point.MountPath)
|
||||
return
|
||||
}
|
||||
defer r.locks.Release(point.MountPath)
|
||||
|
||||
should, err := r.shouldRecover(point.MountPath)
|
||||
if err != nil {
|
||||
glog.Warningf("FuseRecovery: found path %s which is unable to recover due to error %v, skip it", point.MountPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !should {
|
||||
glog.V(3).Infof("FuseRecovery: path %s has already been cleaned up, skip recovering it", point.MountPath)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(3).Infof("FuseRecovery: recovering broken mount point: %v", point)
|
||||
// if app container restart, umount duplicate mount may lead to recover successed but can not access data
|
||||
// so we only umountDuplicate when it has mounted more than the recoverWarningThreshold
|
||||
// please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information
|
||||
if point.Count > r.recoverWarningThreshold {
|
||||
glog.Warningf("FuseRecovery: Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potentially make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold)
|
||||
r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate)
|
||||
r.umountDuplicate(point)
|
||||
}
|
||||
if err := r.recoverBrokenMount(point); err != nil {
|
||||
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
|
||||
return
|
||||
}
|
||||
r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed)
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ func TestRecover_run(t *testing.T) {
|
|||
ApiReader: fakeClient,
|
||||
Recorder: record.NewFakeRecorder(1),
|
||||
recoverFusePeriod: testfuseRecoverPeriod,
|
||||
locks: utils.NewVolumeLocks(),
|
||||
}
|
||||
|
||||
patch1 := ApplyMethod(reflect.TypeOf(fakeMounter), "Mount", func(_ *mount.FakeMounter, source string, target string, _ string, _ []string) error {
|
||||
|
@ -122,6 +123,11 @@ func TestRecover_run(t *testing.T) {
|
|||
})
|
||||
defer patch3.Reset()
|
||||
|
||||
patch4 := ApplyPrivateMethod(r, "shouldRecover", func(mountPath string) (bool, error) {
|
||||
return true, nil
|
||||
})
|
||||
defer patch4.Reset()
|
||||
|
||||
r.runOnce()
|
||||
|
||||
if target, exists := mockedFsMounts[sourcePath]; !exists || target != targetPath {
|
||||
|
@ -293,10 +299,12 @@ func TestNewFuseRecover(t *testing.T) {
|
|||
kubeClient client.Client
|
||||
recorder record.EventRecorder
|
||||
recoverFusePeriod string
|
||||
locks *utils.VolumeLocks
|
||||
}
|
||||
|
||||
fakeClient := fake.NewFakeClient()
|
||||
fakeRecorder := record.NewFakeRecorder(1)
|
||||
volumeLocks := utils.NewVolumeLocks()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -310,6 +318,7 @@ func TestNewFuseRecover(t *testing.T) {
|
|||
kubeClient: fakeClient,
|
||||
recorder: fakeRecorder,
|
||||
recoverFusePeriod: "5s",
|
||||
locks: volumeLocks,
|
||||
},
|
||||
want: &FuseRecover{
|
||||
SafeFormatAndMount: mount.SafeFormatAndMount{
|
||||
|
@ -321,6 +330,7 @@ func TestNewFuseRecover(t *testing.T) {
|
|||
Recorder: fakeRecorder,
|
||||
recoverFusePeriod: defaultFuseRecoveryPeriod,
|
||||
recoverWarningThreshold: defaultRecoverWarningThreshold,
|
||||
locks: volumeLocks,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
|
@ -330,7 +340,7 @@ func TestNewFuseRecover(t *testing.T) {
|
|||
t.Setenv(utils.MountRoot, "/runtime-mnt")
|
||||
t.Setenv(FuseRecoveryPeriod, tt.args.recoverFusePeriod)
|
||||
|
||||
got, err := NewFuseRecover(tt.args.kubeClient, tt.args.recorder, tt.args.kubeClient)
|
||||
got, err := NewFuseRecover(tt.args.kubeClient, tt.args.recorder, tt.args.kubeClient, tt.args.locks)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("NewFuseRecover() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
|
|
|
@ -24,8 +24,8 @@ import (
|
|||
)
|
||||
|
||||
// Register initializes the fuse recover and registers it to the controller manager.
|
||||
func Register(mgr manager.Manager, config config.Config) error {
|
||||
fuseRecover, err := NewFuseRecover(mgr.GetClient(), mgr.GetEventRecorderFor("FuseRecover"), mgr.GetAPIReader())
|
||||
func Register(mgr manager.Manager, ctx config.RunningContext) error {
|
||||
fuseRecover, err := NewFuseRecover(mgr.GetClient(), mgr.GetEventRecorderFor("FuseRecover"), mgr.GetAPIReader(), ctx.VolumeLocks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
|
||||
type registrationFuncs struct {
|
||||
enabled func() bool
|
||||
register func(mgr manager.Manager, cfg config.Config) error
|
||||
register func(mgr manager.Manager, ctx config.RunningContext) error
|
||||
}
|
||||
|
||||
var registraions map[string]registrationFuncs
|
||||
|
@ -42,11 +42,11 @@ func init() {
|
|||
}
|
||||
|
||||
// SetupWithManager registers all the enabled components defined in registrations to the controller manager.
|
||||
func SetupWithManager(mgr manager.Manager, cfg config.Config) error {
|
||||
func SetupWithManager(mgr manager.Manager, ctx config.RunningContext) error {
|
||||
for rName, r := range registraions {
|
||||
if r.enabled() {
|
||||
glog.Infof("Registering %s to controller manager", rName)
|
||||
if err := r.register(mgr, cfg); err != nil {
|
||||
if err := r.register(mgr, ctx); err != nil {
|
||||
glog.Errorf("Got error when registering %s, error: %v", rName, err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
// Register update the host /etc/updatedb.conf
|
||||
func Register(_ manager.Manager, cfg config.Config) error {
|
||||
func Register(_ manager.Manager, ctx config.RunningContext) error {
|
||||
content, err := os.ReadFile(updatedbConfPath)
|
||||
if os.IsNotExist(err) {
|
||||
glog.Info("/etc/updatedb.conf not exist, skip updating")
|
||||
|
@ -21,7 +21,7 @@ func Register(_ manager.Manager, cfg config.Config) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newconfig, err := updateConfig(string(content), cfg.PruneFs, []string{cfg.PrunePath})
|
||||
newconfig, err := updateConfig(string(content), ctx.PruneFs, []string{ctx.PrunePath})
|
||||
if err != nil {
|
||||
glog.Warningf("failed to update updatedb.conf %s ", err)
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue