Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Errors in concurrent access to cached files #80

Closed
ludaavics opened this issue Apr 18, 2023 · 6 comments
Closed

Errors in concurrent access to cached files #80

ludaavics opened this issue Apr 18, 2023 · 6 comments

Comments

@ludaavics
Copy link
Contributor

ludaavics commented Apr 18, 2023

Describe the bug
Concurrent access to cached file throws an exception

To Reproduce

import asyncio
import httpx_cache
import shutil

cache_folder = "./httpx-cache-tests/"
try:
    shutil.rmtree(cache_folder)
except FileNotFoundError:
    pass

async def worker(i):
    print(f"Entering {i}")
    async with httpx_cache.AsyncClient(cache=httpx_cache.FileCache(cache_folder)) as client:
        result = await client.get("http://www.google.com")
        print(f"Exiting  {i}")
        return result

# works; if my mental model is correct, this writes to the file 10x in a row
tasks = [worker(i) for i in range(10)]
foo = await asyncio.gather(*tasks, return_exceptions=False) 

#  raises an error, though it works if the number of tasks is small ??
tasks = [worker(i) for i in range(100)]
foo = await asyncio.gather(*tasks, return_exceptions=False)  

On the first run, the above raises a FileNotFoundError:

FileNotFoundError                         Traceback (most recent call last)
Cell In [1], line 24
     22 #  raises an error, though it works if the number of tasks is small ??
     23 tasks = [worker(i) for i in range(100)]
---> 24 foo = await asyncio.gather(*tasks, return_exceptions=False)  

Cell In [1], line 14, in worker(i)
     12 print(f"Entering {i}")
     13 async with httpx_cache.AsyncClient(cache=httpx_cache.FileCache(cache_folder)) as client:
---> 14     result = await client.get("http://www.google.com")
     15     print(f"Exiting  {i}")
     16     return result

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1751, in AsyncClient.get(self, url, params, headers, cookies, auth, follow_redirects, timeout, extensions)
   1734 async def get(
   1735     self,
   1736     url: URLTypes,
   (...)
   1744     extensions: typing.Optional[dict] = None,
   1745 ) -> Response:
   1746     """
   1747     Send a `GET` request.
   1748 
   1749     **Parameters**: See `httpx.request`.
   1750     """
-> 1751     return await self.request(
   1752         "GET",
   1753         url,
   1754         params=params,
   1755         headers=headers,
   1756         cookies=cookies,
   1757         auth=auth,
   1758         follow_redirects=follow_redirects,
   1759         timeout=timeout,
   1760         extensions=extensions,
   1761     )

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1527, in AsyncClient.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions)
   1498 """
   1499 Build and send a request.
   1500 
   (...)
   1512 [0]: /advanced/#merging-of-configuration
   1513 """
   1514 request = self.build_request(
   1515     method=method,
   1516     url=url,
   (...)
   1525     extensions=extensions,
   1526 )
-> 1527 return await self.send(request, auth=auth, follow_redirects=follow_redirects)

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1614, in AsyncClient.send(self, request, stream, auth, follow_redirects)
   1606 follow_redirects = (
   1607     self.follow_redirects
   1608     if isinstance(follow_redirects, UseClientDefault)
   1609     else follow_redirects
   1610 )
   1612 auth = self._build_request_auth(request, auth)
-> 1614 response = await self._send_handling_auth(
   1615     request,
   1616     auth=auth,
   1617     follow_redirects=follow_redirects,
   1618     history=[],
   1619 )
   1620 try:
   1621     if not stream:

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1642, in AsyncClient._send_handling_auth(self, request, auth, follow_redirects, history)
   1639 request = await auth_flow.__anext__()
   1641 while True:
-> 1642     response = await self._send_handling_redirects(
   1643         request,
   1644         follow_redirects=follow_redirects,
   1645         history=history,
   1646     )
   1647     try:
   1648         try:

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1679, in AsyncClient._send_handling_redirects(self, request, follow_redirects, history)
   1676 for hook in self._event_hooks["request"]:
   1677     await hook(request)
-> 1679 response = await self._send_single_request(request)
   1680 try:
   1681     for hook in self._event_hooks["response"]:

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1716, in AsyncClient._send_single_request(self, request)
   1711     raise RuntimeError(
   1712         "Attempted to send an sync request with an AsyncClient instance."
   1713     )
   1715 with request_context(request=request):
-> 1716     response = await transport.handle_async_request(request)
   1718 assert isinstance(response.stream, AsyncByteStream)
   1719 response.request = request

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx_cache/transport.py:118, in AsyncCacheControlTransport.handle_async_request(self, request)
    116 if self.controller.is_request_cacheable(request):
    117     logger.debug(f"Checking cache for: {request}")
--> 118     cached_response = await self.cache.aget(request)
    119     if cached_response is not None:
    120         logger.debug(f"Found cached response for: {request}")

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx_cache/cache/file.py:65, in FileCache.aget(self, request)
     63 if await filepath.is_file():
     64     async with RWLock().reader:
---> 65         cached = await filepath.read_bytes()
     66     return self.serializer.loads(request=request, cached=cached)
     67 return None

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/anyio/_core/_fileio.py:505, in Path.read_bytes(self)
    504 async def read_bytes(self) -> bytes:
--> 505     return await to_thread.run_sync(self._path.read_bytes)

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
     10 async def run_sync(
     11     func: Callable[..., T_Retval],
     12     *args: object,
     13     cancellable: bool = False,
     14     limiter: Optional[CapacityLimiter] = None
     15 ) -> T_Retval:
     16     """
     17     Call the given function with the given arguments in a worker thread.
     18 
   (...)
     29 
     30     """
---> 31     return await get_asynclib().run_sync_in_worker_thread(
     32         func, *args, cancellable=cancellable, limiter=limiter
     33     )

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
    935 context.run(sniffio.current_async_library_cvar.set, None)
    936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
    865 exception: Optional[BaseException] = None
    866 try:
--> 867     result = context.run(func, *args)
    868 except BaseException as exc:
    869     exception = exc

File ~/miniconda3/envs/dev/lib/python3.9/pathlib.py:1249, in Path.read_bytes(self)
   1245 def read_bytes(self):
   1246     """
   1247     Open the file in bytes mode, read it, and close the file.
   1248     """
-> 1249     with self.open(mode='rb') as f:
   1250         return f.read()

File ~/miniconda3/envs/dev/lib/python3.9/pathlib.py:1242, in Path.open(self, mode, buffering, encoding, errors, newline)
   1236 def open(self, mode='r', buffering=-1, encoding=None,
   1237          errors=None, newline=None):
   1238     """
   1239     Open the file pointed by this path and return a file object, as
   1240     the built-in open() function does.
   1241     """
-> 1242     return io.open(self, mode, buffering, encoding, errors, newline,
   1243                    opener=self._opener)

File ~/miniconda3/envs/dev/lib/python3.9/pathlib.py:1110, in Path._opener(self, name, flags, mode)
   1108 def _opener(self, name, flags, mode=0o666):
   1109     # A stub for the opener argument to built-in open()
-> 1110     return self._accessor.open(self, flags, mode)

FileNotFoundError: [Errno 2] No such file or directory: 'httpx-cache-tests/9ebb612c68d6ddc348478732369ddd52e58ab325704a3beda73e90b8'

On subsequent runs in the same jupyter notebook session, the same code raises some msgpack error:

---------------------------------------------------------------------------
ExtraData                                 Traceback (most recent call last)
Cell In [3], line 23
     19 foo = await asyncio.gather(*tasks, return_exceptions=False)
     22 tasks = [worker(i) for i in range(100)]
---> 23 foo = await asyncio.gather(*tasks, return_exceptions=False)

Cell In [3], line 14, in worker(i)
     12 print(f"Entering {i}")
     13 async with httpx_cache.AsyncClient(cache=httpx_cache.FileCache(cache_folder)) as client:
---> 14     result = await client.get("http://www.google.com")
     15     print(f"Exiting  {i}")
     16     return result

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1751, in AsyncClient.get(self, url, params, headers, cookies, auth, follow_redirects, timeout, extensions)
   1734 async def get(
   1735     self,
   1736     url: URLTypes,
   (...)
   1744     extensions: typing.Optional[dict] = None,
   1745 ) -> Response:
   1746     """
   1747     Send a `GET` request.
   1748 
   1749     **Parameters**: See `httpx.request`.
   1750     """
-> 1751     return await self.request(
   1752         "GET",
   1753         url,
   1754         params=params,
   1755         headers=headers,
   1756         cookies=cookies,
   1757         auth=auth,
   1758         follow_redirects=follow_redirects,
   1759         timeout=timeout,
   1760         extensions=extensions,
   1761     )

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1527, in AsyncClient.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions)
   1498 """
   1499 Build and send a request.
   1500 
   (...)
   1512 [0]: /advanced/#merging-of-configuration
   1513 """
   1514 request = self.build_request(
   1515     method=method,
   1516     url=url,
   (...)
   1525     extensions=extensions,
   1526 )
-> 1527 return await self.send(request, auth=auth, follow_redirects=follow_redirects)

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1614, in AsyncClient.send(self, request, stream, auth, follow_redirects)
   1606 follow_redirects = (
   1607     self.follow_redirects
   1608     if isinstance(follow_redirects, UseClientDefault)
   1609     else follow_redirects
   1610 )
   1612 auth = self._build_request_auth(request, auth)
-> 1614 response = await self._send_handling_auth(
   1615     request,
   1616     auth=auth,
   1617     follow_redirects=follow_redirects,
   1618     history=[],
   1619 )
   1620 try:
   1621     if not stream:

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1642, in AsyncClient._send_handling_auth(self, request, auth, follow_redirects, history)
   1639 request = await auth_flow.__anext__()
   1641 while True:
-> 1642     response = await self._send_handling_redirects(
   1643         request,
   1644         follow_redirects=follow_redirects,
   1645         history=history,
   1646     )
   1647     try:
   1648         try:

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1679, in AsyncClient._send_handling_redirects(self, request, follow_redirects, history)
   1676 for hook in self._event_hooks["request"]:
   1677     await hook(request)
-> 1679 response = await self._send_single_request(request)
   1680 try:
   1681     for hook in self._event_hooks["response"]:

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx/_client.py:1716, in AsyncClient._send_single_request(self, request)
   1711     raise RuntimeError(
   1712         "Attempted to send an sync request with an AsyncClient instance."
   1713     )
   1715 with request_context(request=request):
-> 1716     response = await transport.handle_async_request(request)
   1718 assert isinstance(response.stream, AsyncByteStream)
   1719 response.request = request

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx_cache/transport.py:118, in AsyncCacheControlTransport.handle_async_request(self, request)
    116 if self.controller.is_request_cacheable(request):
    117     logger.debug(f"Checking cache for: {request}")
--> 118     cached_response = await self.cache.aget(request)
    119     if cached_response is not None:
    120         logger.debug(f"Found cached response for: {request}")

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx_cache/cache/file.py:66, in FileCache.aget(self, request)
     64     async with RWLock().reader:
     65         cached = await filepath.read_bytes()
---> 66     return self.serializer.loads(request=request, cached=cached)
     67 return None

File ~/miniconda3/envs/dev/lib/python3.9/site-packages/httpx_cache/serializer/common.py:172, in MsgPackSerializer.loads(self, cached, request)
    168 def loads(  # type: ignore
    169     self, *, cached: bytes, request: tp.Optional[httpx.Request] = None
    170 ) -> httpx.Response:
    171     """Load an httpx.Response from a msgapck bytes."""
--> 172     return super().loads(cached=msgpack.loads(cached, raw=False), request=request)

File msgpack/_unpacker.pyx:201, in msgpack._cmsgpack.unpackb()

ExtraData: unpack(b) received extra data.

Expected behavior
Concurrent access to an already-cached file should be fine ?

Additional context
I probably brought this on myself

@obendidi
Copy link
Owner

I'm not sure I understand why you need to run a request on the same url concurrently ?

Because httpx_cache does not support running the same url concurrently at the same time, and not sure if this is a valid use case

@ludaavics
Copy link
Contributor Author

Thanks for looking into this and for your work on the library! It has been very helpful and a huge time saver for me.

I am probably not thinking about this clearly but guess I'm confused what the read/write locks are for, if not to support concurrent requests to the same url?
Also, I could understand getting in trouble when multiple processes race to read/write the initial value to the cache, but once the file is there, I am not sure follow what the issue is with concurrent read?

@obendidi
Copy link
Owner

You are absolutely right, forgot about the different locks in place for a bit there 😓

The read/write locks should prevent reading a request if another thread/event is writing a response 🤔

I've gone with the simplest solution of using an external lib to handle that, but there probably is something goofy there, I'll have a more detailed look when I have the time, also happy to take any contribution if you have any fix in mind

@ludaavics
Copy link
Contributor Author

Thanks! Happy to contribute where I can, but for now I am mostly puzzled. Will take a look at the locking library and let you know if I come up with anything.

@ludaavics
Copy link
Contributor Author

ludaavics commented Apr 20, 2023

First, when reading the cache, I wonder if we should acquire the lock before checking if the file exists? There could be a race condition where the cached file gets deleted (because it's no longer fresh) between the moment one worker checks that the cached file exists and the moment it actually reads the file.

Second, I don't know if this was clear to you, but it looks like the locks are at an instance level, not at a process level let alone cross-process:

import asyncio
import aiorwlock

async def worker():
    rwlock = aiorwlock.RWLock()

    async with rwlock.writer_lock:
        print(f'{asyncio.current_task().get_name()}: inside writer lock')
        await asyncio.sleep(0.1)
    print(f'{asyncio.current_task().get_name()}: gave up writer lock')

tasks = [worker() for i in range(10)]
foo = await asyncio.gather(*tasks, return_exceptions=False)


Task-5: inside writer lock
Task-6: inside writer lock
Task-7: inside writer lock
Task-8: inside writer lock
Task-9: inside writer lock
Task-10: inside writer lock
Task-11: inside writer lock
Task-12: inside writer lock
Task-13: inside writer lock
Task-14: inside writer lock
Task-5: gave up writer lock
Task-6: gave up writer lock
Task-7: gave up writer lock
Task-8: gave up writer lock
Task-9: gave up writer lock
Task-10: gave up writer lock
Task-11: gave up writer lock
Task-12: gave up writer lock
Task-13: gave up writer lock
Task-14: gave up writer lock
import asyncio
import aiorwlock

rwlock = aiorwlock.RWLock()

async def worker():

    async with rwlock.writer_lock:
        print(f'{asyncio.current_task().get_name()}: inside writer lock')
        await asyncio.sleep(0.1)
    print(f'{asyncio.current_task().get_name()}: gave up writer lock')

tasks = [worker() for i in range(10)]
foo = await asyncio.gather(*tasks, return_exceptions=False)

Task-16: inside writer lock
Task-16: gave up writer lock
Task-17: inside writer lock
Task-17: gave up writer lock
Task-18: inside writer lock
Task-18: gave up writer lock
Task-19: inside writer lock
Task-19: gave up writer lock
Task-20: inside writer lock
Task-20: gave up writer lock
Task-21: inside writer lock
Task-21: gave up writer lock
Task-22: inside writer lock
Task-22: gave up writer lock
Task-23: inside writer lock
Task-23: gave up writer lock
Task-24: inside writer lock
Task-24: gave up writer lock
Task-25: inside writer lock
Task-25: gave up writer lock

So in my initial example where each worker creates it's own client, there's nothing stopping multiple workers touching the cache file at the same time, hence the race conditions where a file gets deleted by one worker while another one is trying to read it. In real life, my workers run on different compute nodes so there's no easy way for me to share single instance of httpx.

I see two options:

  • catch FileNotFoundErrors in aget and return None. Fix the symptoms, technically correct, but true concurrent access is not supported
  • switch to an inter-process locker, e.g. FileLock. There multiple async workarounds here, here or here

Let me know what you think

edit: looking more closely, it looks like filelock doesn't allow multiple readers, so would have to either disallow concurrent reading (slower) or allow files to be deleted while they're being read....

@obendidi
Copy link
Owner

obendidi commented Oct 3, 2023

Closing because this project will be archived in favor of: https://github.com/karosis88/hishel

@obendidi obendidi closed this as completed Oct 3, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants