Skip to content

Commit

Permalink
fix(aiohttp): Use a contextvar to track the request span for streamin…
Browse files Browse the repository at this point in the history
…g responses(#8108)

Without this change, the request and response objects are not freed from memory until the asyncio Task is freed, which can create a memory leak. This change leverages a contextvar to accomplish the same result as the previous version, while ensuring any memory is freed once the current async context is exited.
  • Loading branch information
Sean Stewart committed Nov 24, 2024
1 parent defea4e commit 446c43b
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions ddtrace/contrib/internal/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import contextvars

from aiohttp import web
from aiohttp.web_urldispatcher import SystemRoute

Expand Down Expand Up @@ -70,8 +72,6 @@ async def attach_context(request):
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
request_span.set_traceback()
Expand Down Expand Up @@ -132,19 +132,30 @@ def finish_request_span(request, response):
response_headers=response.headers,
route=route,
)
if type(response) is web.StreamResponse and not response.task.done():
request_span_var.set(request_span)
request.task.add_done_callback(span_done_callback)

request_span.finish()


def span_done_callback(task):
span = request_span_var.get(None)
if span:
span.finish()
request_span_var.set(None)


request_span_var = contextvars.ContextVar("__dd_request_span")


async def on_prepare(request, response):
"""
The on_prepare signal is used to close the request span that is created during
the trace middleware execution.
"""
# NB isinstance is not appropriate here because StreamResponse is a parent of the other
# aiohttp response types
if type(response) is web.StreamResponse and not response.task.done():
return
finish_request_span(request, response)


Expand Down

0 comments on commit 446c43b

Please sign in to comment.