Replies: 6 comments 6 replies
-
I notice your using a sync client on an async endpoint, could you test with an async client ? |
Beta Was this translation helpful? Give feedback.
-
As mentioned, same behaviour for sync test as for async test, but find the test code below import asyncio
import sys
from collections.abc import AsyncIterator
import pytest
from litestar.channels import ChannelsPlugin
from litestar.testing import AsyncTestClient, TestClient
from httpx_sse import aconnect_sse, EventSource, connect_sse
from app import app
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
pytestmark = pytest.mark.anyio
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
@pytest.fixture(name="client")
async def fx_client() -> AsyncIterator[AsyncTestClient]:
async with AsyncTestClient(app=app) as client:
yield client
@pytest.fixture(name="channels")
def fx_channels(client: AsyncTestClient) -> ChannelsPlugin:
return client.app.plugins.get(ChannelsPlugin)
async def test_example_sse(client: AsyncTestClient):
async with aconnect_sse(client, "GET", f"{client.base_url}/example-sse") as event_source:
events = [sse async for sse in event_source.aiter_sse()]
assert len(events) == 5
for idx, sse in enumerate(events, start=1):
assert sse.event == "special"
assert sse.data == str(idx)
assert sse.id == "123"
assert sse.retry == 1000
async def test_notification(client: AsyncTestClient, channels: ChannelsPlugin):
topic = "demo"
message = {"hello": "world"}
async with aconnect_sse(
client,
"GET",
f"{client.base_url}/{topic}/notification",
timeout=1
) as event_source:
channels.publish(message, [topic])
received = anext(event_source.aiter_sse())
print(received) |
Beta Was this translation helpful? Give feedback.
-
spent a little time on this, and yeah it's blocking somewhere in the channels publishing stack, couldnt understand why though |
Beta Was this translation helpful? Give feedback.
-
Here's a minimal reproducer. Doesn't seem to be related to channels in particular, but how our tests client handles streams: import asyncio
from litestar import get, Litestar
from litestar.response import Stream
from litestar.testing import create_async_test_client
async def main():
@get("/")
async def stream() -> Stream:
async def generator():
while True:
yield "foo"
await asyncio.sleep(0.1)
return Stream(generator())
async with create_async_test_client([stream]) as client, client.stream("GET", "/") as s:
await anext(s.aiter_raw())
asyncio.run(main()) When you wrap the same example up in a regular app and run a normal HTTPX client against it, it works as expected. The reason this doesn't work in our test setup is that our test client seems to attempt to consume the whole stream before finishing off the response, which of course doesn't work here. |
Beta Was this translation helpful? Give feedback.
-
Tinkered a bit with the example provided by @provinzkraut and came up with the workaround shared below. I wanted to share this here first for anyone interested before starting on a PR. Also to get some early feedback if the For my use case the solution is good enough, but it breaks with the approach to provide test clients. It also splits the test into an Let me know what you think! DescriptionI could not get the The app is a minimal example with SSE. I'm using Redis here as the channels backend, so I can publish messages from the test client. Ideally, both app and test client get the Lastly, the test module: Two fixtures - one to get the server running and return the url, the other one to set get a Helper module"""
Provide a utility function to run an app as a subprocess.
"""
import pathlib
import subprocess
import time
import socket
from contextlib import contextmanager
import httpx
class StartupError(RuntimeError):
pass
def _get_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Bind to a free port provided by the host
try:
sock.bind(("localhost", 0))
except OSError as e:
raise StartupError("Could not find an open port") from e
else:
return sock.getsockname()[1]
@contextmanager
def run_app(workdir: pathlib.Path, app: str) -> str:
port = _get_available_port()
proc = subprocess.Popen(
args=[
"litestar",
"--app",
app,
"run",
"--port",
str(port)
],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
cwd=workdir
)
url = f"http://127.0.0.1:{port}"
for _ in range(100):
try:
httpx.get(url, timeout=0.1)
break
except httpx.TransportError:
time.sleep(1)
yield url
proc.kill() App module"""
Assemble components into an app that shall be tested
"""
from litestar import Litestar, get
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.response import ServerSentEvent
from redis.asyncio import Redis
@get("/notify/{topic:str}")
async def get_notified(
topic: str,
channels: ChannelsPlugin
) -> ServerSentEvent:
async def generator():
async with channels.start_subscription([topic]) as subscriber:
async for event in subscriber.iter_events():
yield event
return ServerSentEvent(generator(), event_type="CommandHistoryNotification")
def create_test_app():
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)
return Litestar(
route_handlers=[get_notified],
plugins=[channels_instance]
)
app = create_test_app() Test module"""
Test the app running in a subprocess
"""
import asyncio
import pathlib
import sys
import pytest
import httpx
import httpx_sse
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from redis.asyncio import Redis
import helper
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
pytestmark = pytest.mark.anyio
@pytest.fixture(scope="session")
def anyio_backend() -> str:
return "asyncio"
ROOT = pathlib.Path(__file__).parent
@pytest.fixture(name="notification_url", scope="session")
async def fx_notification_url() -> str:
with helper.run_app(workdir=ROOT, app="example_app:app") as url:
yield url
@pytest.fixture(name="redis_channels", scope="session")
async def fx_redis_channels():
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)
await channels_instance._on_startup()
yield channels_instance
await channels_instance._on_shutdown()
async def test_connection(notification_url: str, redis_channels: ChannelsPlugin):
topic = "demo"
message = "hello"
running = asyncio.Event()
running.set()
async def send_notifications():
print("started task")
while running.is_set():
await redis_channels.wait_published(message, channels=[topic])
await asyncio.sleep(0.5)
task = asyncio.create_task(send_notifications())
async with httpx.AsyncClient(base_url=notification_url) as client:
async with httpx_sse.aconnect_sse(
client,
"GET",
f"{notification_url}/notify/{topic}"
) as event_source:
async for event in event_source.aiter_sse():
assert event.data == message
running.clear()
break |
Beta Was this translation helpful? Give feedback.
-
you could also hack this by setting a timeout that default to None in your generator and feed it with some value in your tests, I'm doing that in https://github.com/euri10/litestar_saq_htmx/blob/main/tests/test_saq_htmx.py#L12 |
Beta Was this translation helpful? Give feedback.
-
Hi, I am a bit stuck and maybe I'm just missing the obvious.
For this I used:
I have an SSE endpoint which shall forward data from a ChannelPlugin topic. In the example below, I use another endpoint to publish on this topic but this is mostly for demonstrations sake when testing it with an httpx script.
I also included the unit test from https://github.com/litestar-org/litestar/blob/main/tests/unit/test_response/test_sse.py#L16 just to check that I'm not crazy. Anyway.
The SSE endpoint implementation follows https://github.com/orgs/litestar-org/discussions/3104#discussioncomment-8453195
When running the script for testing it works as expected:
However - I did observe that there is a delay on the SSE endpoint. It takes about 3 seconds for the subscriber to connect to the SSE endpoint and while this happens other requests are also blocked, e.g., the POST request only returns AFTER the SSE connection has been established.
But at least it works... when I run pytest, it just gets stuck and never finishes. I used the debugger to figure out where it gets stuck and its deep in the httpx code where it waits on
transport.handle_request(request)
and never returns. So, it might be a question for thehttpx
repo. I also tested this with an async pytest set up. I also tested it by using threading or asyncio tasks to somehow circumvent the "getting stuck" thing - but neither worksI was also wondering if I'm just doing something fundamentally wrong with setting up the SSE endpoint. I just find it weird that it DOES work without pytest, even though it has this delay. I am aware that
subscriber.iter_events()
just blocks when there is nothing to retrieve but since it DOES work with the standalone script I had expected that it also somehow would work with pytest.App
Pytest
Script for testing
Beta Was this translation helpful? Give feedback.
All reactions