diff --git a/docs/design/proposals/volume-condition.md b/docs/design/proposals/volume-condition.md new file mode 100644 index 00000000000..e1b0acae00f --- /dev/null +++ b/docs/design/proposals/volume-condition.md @@ -0,0 +1,25 @@ +# Suport for CSI `VolumeCondition` aka Volume Health Checker + +## health-checker API + +Under `internal/health-checker` the Manager for health-checking is +implemented. The Manager can start a checking process for a given path, return +the (un)healthy state and stop the checking process when the volume is not +needed anymore. + +The Manager is responsible for creating a suitble checker for the requested +path. If the path a s block-device, the BlockChecker should be created. For a +filesystem path (directory), the FileChecker is appropriate. + +## CephFS + +The health-checker writes to the file `csi-volume-condition.ts` in the root of +the volume. This file containse a JSON formatted timestamp. + +A new `data` directory is introduced for newly created volumes. During the +`NodeStageVolume` call the root of the volume is mounted, and the `data` +directory is bind-mounted inside the container when `NodePublishVolume` is +called. + +The `data` directory makes it possible to place Ceph-CSI internal files in the +root of the volume, without that the user/application has access to it. diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index d1c126fc3d1..2a141761a1d 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -40,6 +40,12 @@ import ( "google.golang.org/grpc/status" ) +// defaultDataRoot is a directory on the volume that will be mounted inside the +// container during NodePublishVolume. The parent directory od defaultDataRoot +// will be mounted during NodeStageVolume, and is available for Ceph-CSI +// internal consumption, +const defaultDataRoot = "data" + // ControllerServer struct of CEPH CSI driver with supported methods of CSI // controller server spec. type ControllerServer struct { @@ -365,6 +371,12 @@ func (cs *ControllerServer) CreateVolume( volumeContext := k8s.RemoveCSIPrefixedParameters(req.GetParameters()) volumeContext["subvolumeName"] = vID.FsSubvolName volumeContext["subvolumePath"] = volOptions.RootPath + + if volOptions.DataRoot == "" { + volOptions.DataRoot = defaultDataRoot + } + volumeContext["dataRoot"] = volOptions.DataRoot + volume := &csi.Volume{ VolumeId: vID.VolumeID, CapacityBytes: volOptions.Size, @@ -456,6 +468,12 @@ func (cs *ControllerServer) CreateVolume( volumeContext := k8s.RemoveCSIPrefixedParameters(req.GetParameters()) volumeContext["subvolumeName"] = vID.FsSubvolName volumeContext["subvolumePath"] = volOptions.RootPath + + if volOptions.DataRoot == "" { + volOptions.DataRoot = defaultDataRoot + } + volumeContext["dataRoot"] = volOptions.DataRoot + volume := &csi.Volume{ VolumeId: vID.VolumeID, CapacityBytes: volOptions.Size, diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 7c6f1b18741..b99ded8bd7c 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -25,6 +25,7 @@ import ( casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs" csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -82,6 +83,7 @@ func NewNodeServer( VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, + healthChecker: hc.NewHealthCheckManager(), } } diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde7605..fd56ebde4f8 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -29,6 +29,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/log" @@ -47,6 +48,7 @@ type NodeServer struct { VolumeLocks *util.VolumeLocks kernelMountOptions string fuseMountOptions string + healthChecker hc.Manager } func getCredentialsForVolume( @@ -209,6 +211,8 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.Internal, err.Error()) } + healthCheckPath := getHealthCheckPath(stagingTargetPath, req.GetVolumeContext()) + // Check if the volume is already mounted if err = ns.tryRestoreFuseMountInNodeStage(ctx, mnt, stagingTargetPath); err != nil { @@ -228,6 +232,8 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.Internal, err.Error()) } + ns.healthChecker.StartChecker(healthCheckPath) + return &csi.NodeStageVolumeResponse{}, nil } @@ -270,6 +276,8 @@ func (ns *NodeServer) NodeStageVolume( } } + ns.healthChecker.StartChecker(healthCheckPath) + return &csi.NodeStageVolumeResponse{}, nil } @@ -452,6 +460,15 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) + // dataPath is the directory that will be bind-mounted into the + // container. If "dataRoot" is empty, the dataPath is the same as the + // stagingTargetPath. + dataPath := stagingTargetPath + dataRoot, ok := req.GetVolumeContext()["dataRoot"] + if ok { + dataPath = path.Join(dataPath, dataRoot) + } + // Considering kubelet make sure the stage and publish operations // are serialized, we dont need any extra locking in nodePublish @@ -464,7 +481,7 @@ func (ns *NodeServer) NodePublishVolume( if err := ns.tryRestoreFuseMountsInNodePublish( ctx, volID, - stagingTargetPath, + dataPath, targetPath, req.GetVolumeContext(), ); err != nil { @@ -510,15 +527,15 @@ func (ns *NodeServer) NodePublishVolume( return nil, status.Error(codes.Internal, err.Error()) } if encrypted { - stagingTargetPath = fscrypt.AppendEncyptedSubdirectory(stagingTargetPath) - if err = fscrypt.IsDirectoryUnlocked(stagingTargetPath, "ceph"); err != nil { + dataPath = fscrypt.AppendEncyptedSubdirectory(dataPath) + if err = fscrypt.IsDirectoryUnlocked(dataPath, "ceph"); err != nil { return nil, status.Error(codes.Internal, err.Error()) } } if err = mounter.BindMount( ctx, - stagingTargetPath, + dataPath, targetPath, req.GetReadonly(), mountOptions); err != nil { @@ -608,6 +625,8 @@ func (ns *NodeServer) NodeUnstageVolume( stagingTargetPath := req.GetStagingTargetPath() + ns.healthChecker.StopChecker(stagingTargetPath) + if err = fsutil.RemoveNodeStageMountinfo(fsutil.VolumeID(volID)); err != nil { log.ErrorLog(ctx, "cephfs: failed to remove NodeStageMountinfo for volume %s: %v", volID, err) @@ -670,6 +689,13 @@ func (ns *NodeServer) NodeGetCapabilities( }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }, { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ @@ -711,8 +737,31 @@ func (ns *NodeServer) NodeGetVolumeStats( } if stat.Mode().IsDir() { - return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false) + res, err := csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false) + if err != nil { + return nil, err + } + + healthy, msg := ns.healthChecker.IsHealthy(req.GetStagingTargetPath()) + res.VolumeCondition = &csi.VolumeCondition{ + Abnormal: !healthy, + } + + if !healthy { + res.VolumeCondition.Message = msg.Error() + } + + return res, nil } return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath) } + +func getHealthCheckPath(basedir string, volumeContext map[string]string) string{ + _, ok := volumeContext["dataRoot"] + if !ok { + return path.Join(basedir, ".meta.csi") + } + + return basedir +} diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 349e9db6c98..0f49d910a6e 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -71,6 +71,12 @@ type VolumeOptions struct { ProvisionVolume bool `json:"provisionVolume"` BackingSnapshot bool `json:"backingSnapshot"` + + // DataRoot is set to the directory that is bind-mounted into the + // container. The parent directory of the DataRoot is not available for + // the end-user, but Ceph-CSI can use it for storing state, doing + // health-checks and the like. + DataRoot string `json:dataRoot` } // Connect a CephFS volume to the Ceph cluster. @@ -266,6 +272,10 @@ func NewVolumeOptions( return nil, err } + if err = extractOptionalOption(&opts.DataRoot, "dataRoot", volOptions); err != nil { + return nil, err + } + if err = opts.InitKMS(ctx, volOptions, req.GetSecrets()); err != nil { return nil, fmt.Errorf("failed to init KMS: %w", err) } diff --git a/internal/health-checker/filechecker.go b/internal/health-checker/filechecker.go new file mode 100644 index 00000000000..c6dd36bf28f --- /dev/null +++ b/internal/health-checker/filechecker.go @@ -0,0 +1,181 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "errors" + "os" + "path" + "time" +) + +// command is what is sent through the channel to terminate the go routine. +type command string + +const ( + // stopCommand is sent through the channel to stop checking. + stopCommand = command("STOP") +) + +type fileChecker struct { + // filename contains the filename that is used for checking. + filename string + + // interval contains the time to sleep between health checks. + interval time.Duration + + // current status + healthy bool + err error + isRunning bool + lastUpdate time.Time + + // commands is the channel to read commands from; when to stop. + commands chan command +} + +func newFileChecker(dir string) (ConditionChecker, error) { + _, err := os.Stat(dir) + if os.IsNotExist(err) { + err = os.MkdirAll(dir, 0755) + } + if err != nil { + return nil, err + } + + return &fileChecker{ + filename: path.Join(dir, "csi-volume-condition.ts"), + healthy: true, + interval: 120 * time.Second, + lastUpdate: time.Now(), + commands: make(chan command), + }, nil +} + +// runChecker is an endless loop that writes a timestamp and reads it back from +// a file. +func (fc *fileChecker) runChecker() { + fc.isRunning = true + + for { + if fc.shouldStop() { + fc.isRunning = false + + return + } + + now := time.Now() + + err := fc.writeTimestamp(now) + if err != nil { + fc.healthy = false + fc.err = err + + continue + } + + ts, err := fc.readTimestamp() + if err != nil { + fc.healthy = false + fc.err = err + + continue + } + + // verify that the written timestamp is read back + if now.Compare(ts) != 0 { + fc.healthy = false + fc.err = errors.New("timestamp read from file does not match what was written") + + continue + } + + // run health check, write a timestamp to a file, read it back + fc.healthy = true + fc.err = nil + fc.lastUpdate = ts + } +} + +func (fc *fileChecker) shouldStop() bool { + start := time.Now() + + for { + // check if we slept long enough to run a next check + slept := time.Since(start) + if slept >= fc.interval { + break + } + + select { + case <-fc.commands: + // a command was reveived, need to stop checking + return true + default: + // continue with checking + } + + time.Sleep(time.Second) + } + + return false +} + +func (fc *fileChecker) start() { + go fc.runChecker() +} + +func (fc *fileChecker) stop() { + fc.commands <- stopCommand +} + +func (fc *fileChecker) isHealthy() (bool, error) { + // check for the last update, it should be within a certain number of + // + // lastUpdate + (N x fc.interval) + // + // Without such a check, a single slow write/read could trigger actions + // to recover an unhealthy volume already. + // + // It is required to check, in case the write or read in the go routine + // is blocked. + return fc.healthy, fc.err +} + +// readTimestamp reads the JSON formatted timestamp from the file. +func (fc *fileChecker) readTimestamp() (time.Time, error) { + var ts time.Time + + data, err := os.ReadFile(fc.filename) + if err != nil { + return ts, err + } + + err = ts.UnmarshalJSON(data) + + return ts, err +} + +// writeTimestamp writes the timestamp to the file in JSON format. +func (fc *fileChecker) writeTimestamp(ts time.Time) error { + data, err := ts.MarshalJSON() + if err != nil { + return err + } + + return os.WriteFile(fc.filename, data, 0644) +} diff --git a/internal/health-checker/filechecker_test.go b/internal/health-checker/filechecker_test.go new file mode 100644 index 00000000000..56427a14b70 --- /dev/null +++ b/internal/health-checker/filechecker_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "path" + "testing" + "time" +) + +func TestFileChecker(t *testing.T) { + t.Parallel() + + volumePath := path.Join(t.TempDir(), "csi-health-check.d") + fc, err := newFileChecker(volumePath) + if err != nil { + t.Fatalf("failed to create FileChecker: %v", err) + } + + checker := fc.(*fileChecker) + checker.interval = time.Second * 5 + + // start the checker + checker.start() + + // wait a second to get the go routine running + time.Sleep(time.Second) + if !checker.isRunning { + t.Error("checker failed to start") + } + + for i := 0; i < 10; i++ { + // check health, should be healthy + healthy, msg := checker.isHealthy() + if !healthy || msg != nil { + t.Error("volume is unhealthy") + } + + time.Sleep(time.Second) + } + + // stop the checker + checker.stop() +} + +func TestWriteReadTimestamp(t *testing.T) { + t.Parallel() + + volumePath := path.Join(t.TempDir(), "csi-health-check.d") + fc, err := newFileChecker(volumePath) + if err != nil { + t.Fatalf("failed to create FileChecker: %v", err) + } + + checker := fc.(*fileChecker) + ts := time.Now() + + err = checker.writeTimestamp(ts) + if err != nil { + t.Fatalf("failed to write timestamp: %v", err) + } + + _, err = checker.readTimestamp() + if err != nil { + t.Fatalf("failed to read timestamp: %v", err) + } +} diff --git a/internal/health-checker/manager.go b/internal/health-checker/manager.go new file mode 100644 index 00000000000..7451ab2bbe6 --- /dev/null +++ b/internal/health-checker/manager.go @@ -0,0 +1,105 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "fmt" + "sync" +) + +// Manager provides the API for getting the health status of a volume. The main +// usage is requesting the health status by path. +// +// When the Manager detects that a new path is used for checking, a new +// instance of a ConditionChecker is created for the path, and started. +// +// Once the path is not active anymore (when NodeUnstageVolume is called), the +// ConditionChecker needs to be stopped, which can be done by +// Manager.StopChecker(). +type Manager interface { + StartChecker(path string) error + StopChecker(path string) + IsHealthy(path string) (bool, error) +} + +// ConditionChecker describes the interface that a health status reporter needs +// to implement. It is used internally by the Manager only. +type ConditionChecker interface { + // start runs a the health checking function in a new go routine. + start() + + // stop terminates a the health checking function that runs in a go + // routine. + stop() + + // isHealthy returns the status of the volume, without blocking. + isHealthy() (bool, error) +} + +type healthCheckManager struct { + checkers sync.Map // map[path]ConditionChecker +} + +func NewHealthCheckManager() Manager { + return &healthCheckManager{ + checkers: sync.Map{}, + } +} + +func (hcm *healthCheckManager) StartChecker(path string) error { + cc, err := newFileChecker(path) + if err != nil { + return err + } + + // load the 'old' ConditionChecker if it exists, otherwuse store 'cc' + old, ok := hcm.checkers.LoadOrStore(path, cc) + if ok { + // 'old' was loaded, cast it to ConditionChecker + cc = old.(ConditionChecker) + } else { + // 'cc' was stored, start it only once + cc.start() + } + + return nil +} + +func (hcm *healthCheckManager) StopChecker(path string) { + old, ok := hcm.checkers.LoadAndDelete(path) + if !ok { + // nothing was loaded, nothing to do + return + } + + // 'old' was loaded, cast it to ConditionChecker + cc := old.(ConditionChecker) + cc.stop() +} + +func (hcm *healthCheckManager) IsHealthy(path string) (bool, error) { + // load the 'old' ConditionChecker if it exists + old, ok := hcm.checkers.Load(path) + if !ok { + return true, fmt.Errorf("no ConditionChecker for path: %s", path) + } + + // 'old' was loaded, cast it to ConditionChecker + cc := old.(ConditionChecker) + + return cc.isHealthy() +} diff --git a/internal/health-checker/manager_test.go b/internal/health-checker/manager_test.go new file mode 100644 index 00000000000..170379e5b01 --- /dev/null +++ b/internal/health-checker/manager_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2023 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "testing" +) + +func TestManager(t *testing.T) { + t.Parallel() + + volumePath := t.TempDir() + mgr := NewHealthCheckManager() + + // expected to have an error in msg + healthy, msg := mgr.IsHealthy(volumePath) + if !(healthy && msg != nil) { + t.Error("ConditionChecker was not started yet, did not get an error") + } + + t.Log("start the checker") + err := mgr.StartChecker(volumePath) + if err != nil { + t.Fatalf("ConditionChecker could not get started: %v", err) + } + + t.Log("check health, should be healthy") + healthy, msg = mgr.IsHealthy(volumePath) + if !healthy || err != nil { + t.Error("volume is unhealthy") + } + + t.Log("stop the checker") + mgr.StopChecker(volumePath) +}