-
Notifications
You must be signed in to change notification settings - Fork 0
/
router.py
127 lines (101 loc) · 3.56 KB
/
router.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import uuid
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ex_back.database import get_db
from ex_back.models import OrderModel
from ex_back.types import (
CreateOrderModel,
CreateOrderResponse,
CreateOrderResponseModel,
EventOutboxResponse,
EventStoreResponse,
EventType,
OrderCreated,
)
from shared.managers.events import EventStoreManager, OutboxEventManager
router = APIRouter()
@router.post(
"/orders",
status_code=201,
response_model=CreateOrderResponse,
response_model_by_alias=True,
)
async def create_order(
model: CreateOrderModel, db: Session = Depends(get_db)
) -> CreateOrderResponse:
# Create event for order creation
event = OrderCreated(
id_=uuid.uuid4(),
type=model.type_,
side=model.side,
instrument=model.instrument,
limit_price=model.limit_price,
quantity=model.quantity,
)
# Using manager to store the event
manager = OutboxEventManager(db)
outbox_event = manager.save(EventType.ORDER_SUBMITTED, event.json())
# The actual handling of the event will be done by an asynchronous worker
# that processes the events in the outbox table
response = CreateOrderResponse(
event_id=outbox_event.id,
created_at=outbox_event.created_at,
event_type=outbox_event.event_type,
status=outbox_event.status,
)
return response
@router.get(
"/events/outbox",
response_model=List[EventOutboxResponse],
response_model_by_alias=True,
)
def list_outbox_events(db: Session = Depends(get_db)) -> List[EventOutboxResponse]:
manager = OutboxEventManager(db)
outbox_events = manager.get_all_events()
# Convert SQLAlchemy model instances to Pydantic model instances
event_responses = [EventOutboxResponse.from_orm(event) for event in outbox_events]
return event_responses
@router.get(
"/events/outbox/{event_id}",
response_model=EventOutboxResponse,
response_model_by_alias=True,
)
def get_outbox_event(
event_id: int, db: Session = Depends(get_db)
) -> EventOutboxResponse:
manager = OutboxEventManager(db)
outbox_event = manager.get_event_by_id(event_id)
if not outbox_event:
raise HTTPException(status_code=404, detail="Event not found")
return EventOutboxResponse.from_orm(outbox_event)
@router.get(
"/events/store",
response_model=List[EventStoreResponse],
response_model_by_alias=True,
)
def list_store_events(db: Session = Depends(get_db)) -> List[EventStoreResponse]:
manager = EventStoreManager(db)
store_events = manager.get_all_events()
# Convert SQLAlchemy model instances to Pydantic model instances
event_responses = [EventStoreResponse.from_orm(event) for event in store_events]
return event_responses
@router.get(
"/events/store/{event_id}",
response_model=EventStoreResponse,
response_model_by_alias=True,
)
def get_store_event(event_id: int, db: Session = Depends(get_db)) -> EventStoreResponse:
manager = EventStoreManager(db)
store_event = manager.get_event_by_id(event_id)
if not store_event:
raise HTTPException(status_code=404, detail="Event not found")
return EventStoreResponse.from_orm(store_event)
@router.get(
"/orders",
response_model=List[CreateOrderResponseModel],
response_model_by_alias=True,
)
def list_orders(db: Session = Depends(get_db)) -> List[CreateOrderResponseModel]:
db_orders = db.query(OrderModel).all()
return [CreateOrderResponseModel.from_orm(order) for order in db_orders]