-
Notifications
You must be signed in to change notification settings - Fork 26
/
trigger_flow_test.py
124 lines (96 loc) · 3.82 KB
/
trigger_flow_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
Test for the classic design pattern - where a client registers on the server to receive future events
"""
import os
import sys
from fastapi_websocket_rpc.utils import gen_uid
from fastapi_websocket_rpc.websocket_rpc_endpoint import WebsocketRPCEndpoint
from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient
from fastapi_websocket_rpc.rpc_methods import RpcMethodsBase
from fastapi import (APIRouter, FastAPI,
WebSocket)
import uvicorn
import pytest
from multiprocessing import Process
import asyncio
# Configurable
PORT = int(os.environ.get("PORT") or "9000")
# Random ID
CLIENT_ID = gen_uid()
uri = f"ws://localhost:{PORT}/ws/{CLIENT_ID}"
MESSAGE = "Good morning!"
############################ Server ############################
class ServerMethods(RpcMethodsBase):
async def register_wake_up_call(self, time_delta: float, name: str) -> str:
async def wake_up_call():
await asyncio.sleep(time_delta)
await self.channel.other.wake_up(message=MESSAGE, name=name)
asyncio.create_task(wake_up_call())
return True
def setup_server():
app = FastAPI()
router = APIRouter()
# expose server methods
endpoint = WebsocketRPCEndpoint(ServerMethods())
# init the endpoint
@router.websocket("/ws/{client_id}")
async def websocket_rpc_endpoint(websocket: WebSocket, client_id: str):
await endpoint.main_loop(websocket, client_id)
app.include_router(router)
uvicorn.run(app, port=PORT)
@pytest.fixture()
def server():
# Run the server as a separate process
proc = Process(target=setup_server, args=(), daemon=True)
proc.start()
yield proc
proc.kill() # Cleanup after test
############################ Client ############################
class ClientMethods(RpcMethodsBase):
def __init__(self):
super().__init__()
self.woke_up_event = asyncio.Event()
self.message = None
self.name = None
async def wake_up(self, message=None, name=None):
self.message = message
self.name = name
# signal wake-up
self.woke_up_event.set()
@pytest.mark.asyncio
async def test_trigger_flow(server):
"""
test cascading async trigger flow from client to sever and back
Request the server to call us back later
"""
async with WebSocketRpcClient(uri,
ClientMethods(),
default_response_timeout=4) as client:
time_delta = 0.5
name = "Logan Nine Fingers"
# Ask for a wake up call
await client.other.register_wake_up_call(time_delta=time_delta, name=name)
# Wait for our wake-up call (or fail on timeout)
await asyncio.wait_for(client.methods.woke_up_event.wait(), 5)
# Note: each channel has its own copy of the methods object
assert client.channel.methods.name == name
assert client.channel.methods.message == MESSAGE
@pytest.mark.asyncio
async def test_on_connect_trigger(server):
"""
test cascading async trigger flow from client to sever and back staring with on_connect callback
"""
time_delta = 0.5
name = "Logan Nine Fingers"
async def on_connect(channel):
# Ask for a wake up call
await channel.other.register_wake_up_call(time_delta=time_delta, name=name)
# Wait for our wake-up call (or fail on timeout)
async with WebSocketRpcClient(uri,
ClientMethods(),
on_connect=[on_connect],
default_response_timeout=4) as client:
await asyncio.wait_for(client.methods.woke_up_event.wait(), 5)
# Note: each channel has its own copy of the methods object
assert client.channel.methods.name == name
assert client.channel.methods.message == MESSAGE