-
Notifications
You must be signed in to change notification settings - Fork 26
/
basic_rpc_test.py
108 lines (90 loc) · 3.32 KB
/
basic_rpc_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
import os
import sys
import asyncio
from multiprocessing import Process
import pytest
import uvicorn
from fastapi import FastAPI
from fastapi_websocket_rpc.rpc_methods import RpcUtilityMethods
from fastapi_websocket_rpc.logger import logging_config, LoggingModes, get_logger
from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient
from fastapi_websocket_rpc.websocket_rpc_endpoint import WebsocketRPCEndpoint
from fastapi_websocket_rpc.utils import gen_uid
# Set debug logs (and direct all logs to UVICORN format)
logging_config.set_mode(LoggingModes.UVICORN, logging.DEBUG)
# Configurable
PORT = int(os.environ.get("PORT") or "9000")
uri = f"ws://localhost:{PORT}/ws"
def setup_server():
app = FastAPI()
endpoint = WebsocketRPCEndpoint(RpcUtilityMethods())
endpoint.register_route(app)
uvicorn.run(app, port=PORT)
@pytest.fixture(scope="module")
def server():
# Run the server as a separate process
proc = Process(target=setup_server, args=(), daemon=True)
proc.start()
yield proc
proc.kill() # Cleanup after test
@pytest.mark.asyncio
async def test_echo(server):
"""
Test basic RPC with a simple echo
"""
async with WebSocketRpcClient(uri, RpcUtilityMethods(), default_response_timeout=4) as client:
text = "Hello World!"
response = await client.other.echo(text=text)
assert response.result == text
@pytest.mark.asyncio
async def test_ping(server):
"""
Test basic RPC with a simple ping
"""
async with WebSocketRpcClient(uri, RpcUtilityMethods(), default_response_timeout=4) as client:
try:
response = await client.other._ping_()
passed = True
except Exception as e:
logging.exception("Ping test failed")
passed = False
assert passed
@pytest.mark.asyncio
async def test_other_channel_id(server):
"""
Test basic RPC with a simple _get_channel_id_
"""
async with WebSocketRpcClient(uri, RpcUtilityMethods(), default_response_timeout=4) as client:
try:
response = await client.other._get_channel_id_()
assert response.result_type == 'str'
passed = True
except Exception as e:
logging.exception("_get_channel_id test failed")
passed = False
assert passed
@pytest.mark.asyncio
async def test_keep_alive(server):
"""
Test basic RPC with a simple echo + keep alive in the background
"""
async with WebSocketRpcClient(uri, RpcUtilityMethods(), default_response_timeout=4, keep_alive=0.1) as client:
text = "Hello World!"
response = await client.other.echo(text=text)
assert response.result == text
await asyncio.sleep(0.6)
@pytest.mark.asyncio
async def test_structured_response(server):
"""
Test RPC with structured (pydantic model) data response
Using process details as example data
"""
async with WebSocketRpcClient(uri, RpcUtilityMethods(), default_response_timeout=4) as client:
utils = RpcUtilityMethods()
ourProcess = await utils.get_process_details()
response = await client.other.get_process_details()
# We got a valid process id
assert isinstance(response.result["pid"], int)
# We have all the details form the other process
assert "cmd" in response.result