Skip to content

Commit

Permalink
cephfs: enable VolumeCondition with new health-checker
Browse files Browse the repository at this point in the history
The HealthChecker is configured to use the Staging path pf the volume,
with a `.csi/` subdirectory. In the future this directory could be a
directory that is not under the Published directory.

Fixes: ceph#4219
Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Nov 2, 2023
1 parent 080aa40 commit 40e2aa1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
2 changes: 2 additions & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -82,6 +83,7 @@ func NewNodeServer(
VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(),
}
}

Expand Down
66 changes: 64 additions & 2 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
Expand All @@ -47,6 +48,7 @@ type NodeServer struct {
VolumeLocks *util.VolumeLocks
kernelMountOptions string
fuseMountOptions string
healthChecker hc.Manager
}

func getCredentialsForVolume(
Expand Down Expand Up @@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume(
}
}

ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)

return &csi.NodeStageVolumeResponse{}, nil
}

// startSharedHealthChecker starts a health-checker on the stagingTargetPath.
// This checker can be shared between multiple containers.
//
// TODO: start a FileChecker for read-writable volumes that have an app-data subdir.
func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) {
// The StatChecker works for volumes that do not have a dedicated app-data
// subdirectory, or are read-only.
err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType)
if err != nil {
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
}
}

func (ns *NodeServer) mount(
ctx context.Context,
mnt mounter.VolumeMounter,
Expand Down Expand Up @@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume(

// Ensure staging target path is a mountpoint.

if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil {
isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)

return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume(

// Check if the volume is already mounted

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
isMnt, err = util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)

Expand Down Expand Up @@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume(
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
Expand Down Expand Up @@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume(
}

volID := req.GetVolumeId()

ns.healthChecker.StopSharedChecker(volID)

if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)

Expand Down Expand Up @@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities(
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Expand All @@ -694,6 +728,34 @@ func (ns *NodeServer) NodeGetVolumeStats(
return nil, status.Error(codes.InvalidArgument, err.Error())
}

// health check first, return without stats if unhealthy
healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath)

// If healthy and an error is returned, it means that the checker was not
// started. This could happen when the node-plugin was restarted and the
// volume is already staged and published.
if healthy && msg != nil {
// Start a StatChecker for the mounted targetPath, yhis prevents
// writing a file in the user-visible location. Ideally a (shared)
// FileChecker is started with the stagingTargetPath, but we can't
// get the stagingPath from the request.
err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType)
if err != nil {
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
}
}

// !healthy indicates a problem with the volume
if !healthy {
return &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: msg.Error(),
},
}, nil
}

// warning: stat() may hang on an unhealthy volume
stat, err := os.Stat(targetPath)
if err != nil {
if util.IsCorruptedMountError(err) {
Expand Down

0 comments on commit 40e2aa1

Please sign in to comment.