Skip to content

Commit

Permalink
migrate cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
malmans2 committed Mar 18, 2024
1 parent da4d5e3 commit b46c128
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 107 deletions.
144 changes: 64 additions & 80 deletions cacholote/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import collections
import datetime
import functools
import json
import posixpath
from typing import Any, Callable, Literal, Optional

Expand Down Expand Up @@ -53,58 +51,23 @@ def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, s
return files


def _delete_cache_file(
obj: dict[str, Any],
session: sa.orm.Session | None = None,
cache_entry_id: int | None = None,
sizes: dict[str, int] | None = None,
dry_run: bool = False,
) -> Any:
logger = config.get().logger

if {"type", "callable", "args", "kwargs"} == set(obj) and obj["callable"] in (
"cacholote.extra_encoders:decode_xr_dataarray",
"cacholote.extra_encoders:decode_xr_dataset",
"cacholote.extra_encoders:decode_io_object",
):
sizes = sizes or {}
cache_fs, cache_dirname = utils.get_cache_files_fs_dirname()
cache_dirname = cache_fs.unstrip_protocol(cache_dirname)

fs, urlpath = extra_encoders._get_fs_and_urlpath(*obj["args"][:2])
urlpath = fs.unstrip_protocol(urlpath)

if posixpath.dirname(urlpath) == cache_dirname:
size = sizes.pop(urlpath, 0)
if not dry_run:
if session:
for cache_entry in session.scalars(
sa.select(database.CacheEntry).filter(
database.CacheEntry.id == cache_entry_id
)
):
logger.info("delete cache entry", cache_entry=cache_entry)
session.delete(cache_entry)
database._commit_or_rollback(session)

if fs.exists(urlpath):
logger.info("delete cache file", urlpath=urlpath, size=size)
fs.rm(
urlpath,
recursive=obj["args"][0]["type"] == "application/vnd+zarr",
)

return obj


def _delete_cache_entry(
session: sa.orm.Session, cache_entry: database.CacheEntry
) -> None:
fs, _ = utils.get_cache_files_fs_dirname()
files_to_delete = _get_files_from_cache_entry(cache_entry)
logger = config.get().logger

# First, delete database entry
logger.info("deleting cache entry", cache_entry=cache_entry)
session.delete(cache_entry)
database._commit_or_rollback(session)

# Then, delete files
json.loads(cache_entry._result_as_string, object_hook=_delete_cache_file)
for urlpath, file_type in files_to_delete.items():
if fs.exists(urlpath):
logger.info("deleting cache file", urlpath=urlpath)
fs.rm(urlpath, recursive=file_type == "application/vnd+zarr")


def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -134,30 +97,33 @@ def __init__(self) -> None:

urldir = self.fs.unstrip_protocol(self.dirname)

self.logger.info("get disk usage of cache files")
self.sizes: dict[str, int] = collections.defaultdict(int)
self.logger.info("getting disk usage")
self.file_sizes: dict[str, int] = collections.defaultdict(int)
for path, size in self.fs.du(self.dirname, total=False).items():
# Group dirs
urlpath = self.fs.unstrip_protocol(path)
basename, *_ = urlpath.replace(urldir, "", 1).strip("/").split("/")
if basename:
self.sizes[posixpath.join(urldir, basename)] += size
self.file_sizes[posixpath.join(urldir, basename)] += size

self.log_disk_usage()

@property
def size(self) -> int:
sum_sizes = sum(self.sizes.values())
self.logger.info("check cache files total size", size=sum_sizes)
return sum_sizes
def disk_usage(self) -> int:
return sum(self.file_sizes.values())

def log_disk_usage(self) -> None:
self.logger.info("disk usage check", disk_usage=self.disk_usage)

def stop_cleaning(self, maxsize: int) -> bool:
return self.size <= maxsize
return self.disk_usage <= maxsize

def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
def get_unknown_sizes(self, lock_validity_period: float | None) -> dict[str, int]:
self.logger.info("getting unknown files")

utcnow = utils.utcnow()
files_to_skip = []
for urlpath in self.sizes:
for urlpath in self.file_sizes:
if urlpath.endswith(".lock"):
modified = self.fs.modified(urlpath)
if modified.tzinfo is None:
Expand All @@ -169,30 +135,28 @@ def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
files_to_skip.append(urlpath)
files_to_skip.append(urlpath.rsplit(".lock", 1)[0])

unknown_sizes = {k: v for k, v in self.sizes.items() if k not in files_to_skip}
unknown_sizes = {
k: v for k, v in self.file_sizes.items() if k not in files_to_skip
}
if unknown_sizes:
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
json.loads(
cache_entry._result_as_string,
object_hook=functools.partial(
_delete_cache_file,
sizes=unknown_sizes,
dry_run=True,
),
)
return set(unknown_sizes)
for file in _get_files_from_cache_entry(cache_entry):
unknown_sizes.pop(file)
return unknown_sizes

def delete_unknown_files(
self, lock_validity_period: float | None, recursive: bool
) -> None:
for urlpath in self.get_unknown_files(lock_validity_period):
size = self.sizes.pop(urlpath)
if self.fs.exists(urlpath):
self.logger.info(
"delete unknown", urlpath=urlpath, size=size, recursive=recursive
)
self.fs.rm(urlpath, recursive=recursive)
unknown_sizes = self.get_unknown_sizes(lock_validity_period)
for urlpath in unknown_sizes:
self.file_sizes.pop(urlpath)
self.remove_files(
list(unknown_sizes),
recursive=recursive,
msg="deleting unknown files",
)
self.log_disk_usage()

@staticmethod
@pydantic.validate_call
Expand Down Expand Up @@ -239,15 +203,21 @@ def _get_method_sorters(
sorters.append(database.CacheEntry.expiration)
return sorters

def _remove_files(
def remove_files(
self,
files: list[str],
max_tries: int = 10,
msg: str = "deleting cache files",
**kwargs: Any,
) -> None:
assert max_tries >= 1

if files:
self.logger.info("deleting files", urlpath=len(files))
self.logger.info(
msg,
number_of_files=len(files),
recursive=kwargs.get("recursive", False),
)

n_tries = 0
while files and n_tries < max_tries:
Expand Down Expand Up @@ -277,31 +247,45 @@ def delete_cache_files(

files_to_delete = []
dirs_to_delete = []
self.logger.info("getting cache entries to delete")
number_of_cache_entries = 0
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
):
files = _get_files_from_cache_entry(cache_entry)
if files:
number_of_cache_entries += 1
session.delete(cache_entry)

for file, file_type in files.items():
self.sizes.pop(file, 0)
self.file_sizes.pop(file, 0)
if file_type == "application/vnd+zarr":
dirs_to_delete.append(file)
else:
files_to_delete.append(file)

if self.stop_cleaning(maxsize):
break

if number_of_cache_entries:
self.logger.info(
"deleting cache entries",
number_of_cache_entries=number_of_cache_entries,
)
database._commit_or_rollback(session)

self._remove_files(files_to_delete, recursive=False)
self._remove_files(dirs_to_delete, recursive=True)
self.remove_files(files_to_delete, recursive=False)
self.remove_files(dirs_to_delete, recursive=True)
self.log_disk_usage()

if not self.stop_cleaning(maxsize):
raise ValueError(
f"Unable to clean {self.dirname!r}. Final size: {self.size!r}. Expected size: {maxsize!r}"
(
f"Unable to clean {self.dirname!r}."
f" Final disk usage: {self.disk_usage!r}."
f" Expected disk usage: {maxsize!r}"
)
)


Expand Down
57 changes: 30 additions & 27 deletions tests/test_60_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import contextlib
import datetime
import os
import pathlib
import time
from typing import Any, Literal
Expand All @@ -13,6 +14,7 @@

from cacholote import cache, clean, config, utils

ONE_BYTE = os.urandom(1)
does_not_raise = contextlib.nullcontext


Expand Down Expand Up @@ -41,7 +43,7 @@ def test_clean_cache_files(
# Create files
for algorithm in ("LRU", "LFU"):
filename = tmp_path / f"{algorithm}.txt"
fsspec.filesystem("file").pipe_file(filename, b"1")
fsspec.filesystem("file").pipe_file(filename, ONE_BYTE)

# Copy to cache
(lru_path,) = {open_url(tmp_path / "LRU.txt").path for _ in range(2)}
Expand All @@ -68,7 +70,7 @@ def test_delete_unknown_files(

# Create file
tmpfile = tmp_path / "test.txt"
fsspec.filesystem("file").pipe_file(tmpfile, b"1")
fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE)

# Copy to cache
cached_file = open_url(tmpfile).path
Expand Down Expand Up @@ -113,7 +115,7 @@ def test_clean_locked_files(

# Create file
tmpfile = tmp_path / "test.txt"
fsspec.filesystem("file").pipe_file(tmpfile, b"1")
fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE)

# Copy to cache
cached_file = open_url(tmpfile).path
Expand Down Expand Up @@ -157,7 +159,7 @@ def test_clean_tagged_files(
expected_ls = []
for tag in [None, "1", "2"]:
tmpfile = tmp_path / f"test_{tag}.txt"
fsspec.filesystem("file").pipe_file(tmpfile, b"1")
fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE)
with config.set(tag=tag):
cached_file = open_url(tmpfile).path
if tag not in cleaned:
Expand Down Expand Up @@ -214,16 +216,16 @@ def test_clean_invalid_cache_entries(
fs, dirname = utils.get_cache_files_fs_dirname()

# Valid cache file
fsspec.filesystem("file").pipe_file(tmp_path / "valid.txt", b"1")
fsspec.filesystem("file").pipe_file(tmp_path / "valid.txt", ONE_BYTE)
valid = open_url(tmp_path / "valid.txt").path

# Corrupted cache file
fsspec.filesystem("file").pipe_file(tmp_path / "corrupted.txt", b"1")
fsspec.filesystem("file").pipe_file(tmp_path / "corrupted.txt", ONE_BYTE)
corrupted = open_url(tmp_path / "corrupted.txt").path
fs.touch(corrupted)

# Expired cache file
fsspec.filesystem("file").pipe_file(tmp_path / "expired.txt", b"1")
fsspec.filesystem("file").pipe_file(tmp_path / "expired.txt", ONE_BYTE)
with config.set(expiration=utils.utcnow() + datetime.timedelta(seconds=0.2)):
expired = open_url(tmp_path / "expired.txt").path
time.sleep(0.2)
Expand Down Expand Up @@ -255,51 +257,52 @@ def test_cleaner_logging(
) -> None:
# Cache file and create unknown
tmpfile = tmp_path / "test.txt"
fsspec.filesystem("file").pipe_file(tmpfile, b"1")
cached_file = open_url(tmpfile)
fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE)
open_url(tmpfile)
fs, dirname = utils.get_cache_files_fs_dirname()
fs.pipe_file(f"{dirname}/unknown.txt", b"1")
fs.pipe_file(f"{dirname}/unknown.txt", ONE_BYTE)

# Clean
config.set(logger=structlog.get_logger())
clean.clean_cache_files(0, delete_unknown_files=True)
captured = iter(capsys.readouterr().out.splitlines())

sep = " " * 15
line = next(captured)
assert "getting disk usage" in line

line = next(captured)
assert "get disk usage of cache files" in line
assert line.endswith(f"disk usage check{sep}disk_usage=2")

line = next(captured)
assert "get unknown files" in line
assert "getting unknown files" in line

line = next(captured)
assert "delete unknown" in line
assert "recursive=False" in line
assert f"urlpath=file://{dirname}/unknown.txt" in line
assert "size=1" in line
line.endswith(f"deleting unknown files{sep}number_of_files=1{sep}recursive=False")

line = next(captured)
assert "check cache files total size" in line
assert "size=1" in line
line.endswith(f"disk usage check{sep}disk_usage=1")

line = next(captured)
assert "delete cache entry" in line
assert "cache_entry=" in line
assert "getting cache entries to delete" in line

line = next(captured)
assert "delete cache file" in line
assert f"urlpath=file://{cached_file.path}" in line
assert "size=1" in line
line.endswith(f"deleting cache entries{sep}number_of_cache_entries=1")

line = next(captured)
assert "check cache files total size" in line
assert "size=0" in line
line.endswith(f"deleting cache files{sep}number_of_files=1{sep}recursive=False")

line = next(captured)
line.endswith(f"disk usage check{sep}disk_usage=0")

assert next(captured, None) is None


def test_clean_multiple_files(tmp_path: pathlib.Path) -> None:
fs, dirname = utils.get_cache_files_fs_dirname()

fsspec.filesystem("file").pipe_file(tmp_path / "test1.txt", b"1")
fsspec.filesystem("file").pipe_file(tmp_path / "test2.txt", b"2")
fsspec.filesystem("file").pipe_file(tmp_path / "test1.txt", ONE_BYTE)
fsspec.filesystem("file").pipe_file(tmp_path / "test2.txt", ONE_BYTE)

open_urls(tmp_path / "test1.txt", tmp_path / "test2.txt")
assert len(fs.ls(dirname)) == 2
Expand Down

0 comments on commit b46c128

Please sign in to comment.