Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HH-234831 small fixes #746

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/example_app/pages/example.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from frontik.dependencies import HttpClientT
from frontik.dependencies import HttpClient
from frontik.routing import router


@router.get('/example')
async def example_page(http_client: HttpClientT) -> dict:
async def example_page(http_client: HttpClient) -> dict:
result = await http_client.get_url('http://example.com', '/')
return {'example': result.status_code}
32 changes: 32 additions & 0 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
from http_client import options as http_client_options
from http_client.balancing import RequestBalancerBuilder
from lxml import etree
from starlette.types import ASGIApp, Receive, Scope, Send
from tornado import httputil

from frontik import app_integrations
from frontik.app_integrations.statsd import StatsDClient, StatsDClientStub, create_statsd_client
from frontik.balancing_client import create_http_client
from frontik.dependencies import clients
from frontik.handler_asgi import serve_tornado_request
from frontik.options import options
from frontik.process import WorkerState
from frontik.routing import (
import_all_pages,
method_not_allowed_router,
not_found_router,
router,
routers,
)
Expand Down Expand Up @@ -202,6 +207,8 @@ def __init__(self, frontik_app: FrontikApplication) -> None:
self.get_frontik_and_apps_versions = frontik_app.get_frontik_and_apps_versions
self.statsd_client = frontik_app.statsd_client

self.add_middleware(FrontikMiddleware)


@router.get('/version')
async def get_version(request: Request) -> Response:
Expand All @@ -212,3 +219,28 @@ async def get_version(request: Request) -> Response:
@router.get('/status')
async def get_status(request: Request) -> ORJSONResponse:
return ORJSONResponse(request.app.get_current_status())


class FrontikMiddleware:
def __init__(self, app: ASGIApp) -> None:
self.app = app

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope['type'] != 'http':
await self.app(scope, receive, send)
return

clients.get()['http_client'] = create_http_client(scope)
clients.get()['app_config'] = scope['app'].config
clients.get()['statsd_client'] = scope['app'].statsd_client
await self.app(scope, receive, send)


@not_found_router.get('__not_found')
async def default_404() -> Response:
return Response(status_code=404)


@method_not_allowed_router.get('__method_not_allowed')
async def default_405(request: Request) -> Response:
return Response(status_code=405, headers={'Allow': ', '.join(request.scope['allowed_methods'])})
39 changes: 18 additions & 21 deletions frontik/balancing_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import time

from fastapi import Request
from http_client import HttpClient, RequestBuilder
from http_client.request_response import USER_AGENT_HEADER
from starlette.datastructures import Headers
from starlette.types import Scope

from frontik import request_context
from frontik.auth import DEBUG_AUTH_HEADER_NAME
Expand All @@ -13,44 +14,40 @@
OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms'


def modify_http_client_request(request: Request, balanced_request: RequestBuilder) -> None:
def modify_http_client_request(scope: Scope, balanced_request: RequestBuilder) -> None:
headers = Headers(scope=scope)
balanced_request.headers['x-request-id'] = request_context.get_request_id()
balanced_request.headers[OUTER_TIMEOUT_MS_HEADER] = f'{balanced_request.request_timeout * 1000:.0f}'

outer_timeout = request.headers.get(OUTER_TIMEOUT_MS_HEADER.lower())
outer_timeout = headers.get(OUTER_TIMEOUT_MS_HEADER.lower())
if outer_timeout:
timeout_checker = get_timeout_checker(
request.headers.get(USER_AGENT_HEADER.lower()),
headers.get(USER_AGENT_HEADER.lower()),
float(outer_timeout),
request['start_time'],
scope['start_time'],
)
timeout_checker.check(balanced_request)

if request['debug_mode'].pass_debug:
if scope['debug_mode'].pass_debug:
balanced_request.headers[DEBUG_HEADER_NAME] = 'true'

# debug_timestamp is added to avoid caching of debug responses
balanced_request.path = make_url(balanced_request.path, debug_timestamp=int(time.time()))

for header_name in ('Authorization', DEBUG_AUTH_HEADER_NAME):
authorization = request.headers.get(header_name.lower())
authorization = headers.get(header_name.lower())
if authorization is not None:
balanced_request.headers[header_name] = authorization


def get_http_client(modify_request_hook=None):
async def _get_http_client(request: Request) -> HttpClient:
def hook(balanced_request):
if modify_request_hook is not None:
modify_request_hook(balanced_request)
def create_http_client(scope: Scope) -> HttpClient:
def hook(balanced_request):
if (local_hook := scope.get('_http_client_hook')) is not None:
local_hook(balanced_request)

modify_http_client_request(request, balanced_request)
modify_http_client_request(scope, balanced_request)

http_client = request.app.http_client_factory.get_http_client(
modify_http_request_hook=hook,
debug_enabled=request['debug_mode'].enabled,
)

return http_client

return _get_http_client
return scope['app'].http_client_factory.get_http_client(
modify_http_request_hook=hook,
debug_enabled=scope['debug_mode'].enabled,
)
2 changes: 1 addition & 1 deletion frontik/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def transform_chunk(
else:
wrap_headers = {'Content-Type': media_types.APPLICATION_XML, DEBUG_HEADER_NAME: 'true'}

chunk = b'Streamable response' if response.data_written else _data_to_chunk(response.body)
chunk = b'Streamable response' if response.headers_written else _data_to_chunk(response.body)
start_time = time.time()
handler_name = request_context.get_handler_name()

Expand Down
40 changes: 29 additions & 11 deletions frontik/dependencies/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
import contextvars
from typing import Annotated, Any

from fastapi import Depends, Request
from http_client import HttpClient
import http_client
from fastapi import Depends

from frontik.app_integrations.statsd import StatsDClient
from frontik.balancing_client import get_http_client
from frontik.app_integrations import statsd

clients: contextvars.ContextVar = contextvars.ContextVar('clients')

async def get_app_config(request: Request) -> Any:
return request.app.config

def get_app_config() -> Any:
return clients.get().get('app_config')

async def get_statsd_client(request: Request) -> StatsDClient:
return request.app.statsd_client

async def _get_app_config() -> Any:
return get_app_config()

StatsDClientT = Annotated[StatsDClient, Depends(get_statsd_client)]
AppConfig = Annotated[Any, Depends(get_app_config)]
HttpClientT = Annotated[HttpClient, Depends(get_http_client())]

def get_http_client() -> http_client.HttpClient:
return clients.get().get('http_client')


async def _get_http_client() -> http_client.HttpClient:
return get_http_client()


def get_statsd_client() -> statsd.StatsDClient:
return clients.get().get('statsd_client')


async def _get_statsd_client() -> statsd.StatsDClient:
return get_statsd_client()


StatsDClient = Annotated[statsd.StatsDClient, Depends(_get_statsd_client)]
AppConfig = Annotated[Any, Depends(_get_app_config)]
HttpClient = Annotated[http_client.HttpClient, Depends(_get_http_client)]
2 changes: 1 addition & 1 deletion frontik/frontik_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self.status_code = status_code
self.body = body
self._reason = reason
self.data_written = False
self.headers_written = False

@property
def reason(self) -> str:
Expand Down
75 changes: 20 additions & 55 deletions frontik/handler_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@
import logging
from contextlib import ExitStack
from functools import partial
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING

from fastapi.routing import APIRoute
from tornado import httputil
from tornado.httputil import HTTPServerRequest

from frontik import media_types, request_context
from frontik import request_context
from frontik.debug import DebugMode, DebugTransform
from frontik.frontik_response import FrontikResponse
from frontik.http_status import CLIENT_CLOSED_REQUEST
from frontik.loggers import CUSTOM_JSON_EXTRA, JSON_REQUESTS_LOGGER
from frontik.request_integrations import get_integrations
from frontik.request_integrations.integrations_dto import IntegrationDto
from frontik.routing import find_route, get_allowed_methods
from frontik.routing import find_route

if TYPE_CHECKING:
from frontik.app import FrontikApplication, FrontikAsgiApp
Expand Down Expand Up @@ -51,7 +50,7 @@ async def serve_tornado_request(
assert tornado_request.connection is not None
tornado_request.connection.set_close_callback(None) # type: ignore

if not response.data_written:
if not response.headers_written:
for integration in integrations.values():
integration.set_response(response)

Expand All @@ -74,20 +73,18 @@ async def process_request(
debug_mode = make_debug_mode(frontik_app, tornado_request)
if debug_mode.auth_failed():
assert debug_mode.failed_auth_header is not None
return make_debug_auth_failed_response(debug_mode.failed_auth_header)
return FrontikResponse(
status_code=http.client.UNAUTHORIZED, headers={'WWW-Authenticate': debug_mode.failed_auth_header}
)

assert tornado_request.method is not None

scope = find_route(tornado_request.path, tornado_request.method)
route: Optional[APIRoute] = scope['route']
tornado_request._path_format = scope['route'].path_format # type: ignore

if route is None:
response = await make_not_found_response(frontik_app, tornado_request, debug_mode, scope)
else:
tornado_request._path_format = route.path_format # type: ignore
response = await execute_asgi_page(frontik_app, asgi_app, tornado_request, scope, debug_mode, integrations)
response = await execute_asgi_page(frontik_app, asgi_app, tornado_request, scope, debug_mode, integrations)

if debug_mode.enabled and not response.data_written:
if debug_mode.enabled and not response.headers_written:
debug_transform = DebugTransform(frontik_app, debug_mode)
response = debug_transform.transform_chunk(tornado_request, response)

Expand Down Expand Up @@ -131,19 +128,19 @@ async def receive():
'more_body': False,
}

async def send(data):
async def send(message):
assert tornado_request.connection is not None

if data['type'] == 'http.response.start':
response.status_code = int(data['status'])
for h in data['headers']:
if message['type'] == 'http.response.start':
response.status_code = int(message['status'])
for h in message['headers']:
if len(h) == 2:
response.headers.add(h[0].decode(CHARSET), h[1].decode(CHARSET))
elif data['type'] == 'http.response.body':
chunk = data['body']
if debug_mode.enabled or not data.get('more_body'):
elif message['type'] == 'http.response.body':
chunk = message['body']
if debug_mode.enabled or not message.get('more_body'):
response.body += chunk
elif not response.data_written:
elif not response.headers_written:
for integration in integrations.values():
integration.set_response(response)

Expand All @@ -152,39 +149,17 @@ async def send(data):
headers=response.headers,
chunk=chunk,
)
response.data_written = True
response.headers_written = True
else:
await tornado_request.connection.write(chunk)
else:
raise RuntimeError(f'Unsupported response type "{data["type"]}" for asgi app')
raise RuntimeError(f'Unsupported response type "{message["type"]}" for asgi app')

await asgi_app(scope, receive, send)

return response


async def make_not_found_response(
frontik_app: FrontikApplication,
tornado_request: httputil.HTTPServerRequest,
debug_mode: DebugMode,
scope: dict,
) -> FrontikResponse:
allowed_methods = get_allowed_methods(scope)

if allowed_methods and hasattr(frontik_app, 'method_not_allowed_handler'):
return await frontik_app.method_not_allowed_handler(
tornado_request, debug_mode, path_params={'allowed_methods': allowed_methods}
)

if allowed_methods:
return FrontikResponse(status_code=405, headers={'Allow': ', '.join(allowed_methods)})

if hasattr(frontik_app, 'not_found_handler'):
return await frontik_app.not_found_handler(tornado_request, debug_mode, path_params={})

return build_error_data(404, 'Not Found')


def make_debug_mode(frontik_app: FrontikApplication, tornado_request: HTTPServerRequest) -> DebugMode:
debug_mode = DebugMode(tornado_request)

Expand All @@ -199,16 +174,6 @@ def make_debug_mode(frontik_app: FrontikApplication, tornado_request: HTTPServer
return debug_mode


def make_debug_auth_failed_response(auth_header: str) -> FrontikResponse:
return FrontikResponse(status_code=http.client.UNAUTHORIZED, headers={'WWW-Authenticate': auth_header})


def build_error_data(status_code: int = 500, message: Optional[str] = 'Internal Server Error') -> FrontikResponse:
headers = {'Content-Type': media_types.TEXT_HTML}
data = f'<html><title>{status_code}: {message}</title><body>{status_code}: {message}</body></html>'.encode()
return FrontikResponse(status_code=status_code, headers=headers, body=data)


def _on_connection_close(tornado_request, process_request_task, integrations):
request_id = integrations.get('request_id', IntegrationDto()).get_value()
with request_context.request_context(request_id):
Expand Down
4 changes: 2 additions & 2 deletions frontik/request_integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from frontik.request_integrations.clients import clients_ctx
from frontik.request_integrations.request_id import request_id_ctx
from frontik.request_integrations.request_limiter import request_limiter
from frontik.request_integrations.server_timing import server_timing
from frontik.request_integrations.telemetry import otel_instrumentation_ctx

_integrations: list = [
('request_id', request_id_ctx),
('request_limiter', request_limiter),
('server_timing', server_timing),
('telemetry', otel_instrumentation_ctx),
('clients', clients_ctx),
]


Expand Down
13 changes: 13 additions & 0 deletions frontik/request_integrations/clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from contextlib import contextmanager

from frontik.dependencies import clients
from frontik.request_integrations.integrations_dto import IntegrationDto


@contextmanager
def clients_ctx(_frontik_app, _tornado_request):
token = clients.set({})
try:
yield IntegrationDto()
finally:
clients.reset(token)
2 changes: 1 addition & 1 deletion frontik/request_integrations/request_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


@contextmanager
def request_id_ctx(_, tornado_request):
def request_id_ctx(_frontik_app, tornado_request):
request_id = tornado_request.headers.get('X-Request-Id') or generate_uniq_timestamp_request_id()
if options.validate_request_id:
check_request_id(request_id)
Expand Down
Loading
Loading