Skip to content

Commit

Permalink
feat: megaparse sdk tests (#148)
Browse files Browse the repository at this point in the history
* working parsing

* tests timeouts

* tests errors

* test CI

* any worker

* runs on

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* ci

* CI timeout tests

* added sudo

* ci

* remove tls CI

---------

Co-authored-by: aminediro <[email protected]>
  • Loading branch information
AmineDiro and aminediro authored Nov 25, 2024
1 parent b22e631 commit e030285
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 55 deletions.
87 changes: 87 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
name: Run Tests API and Worker

on:
pull_request:
workflow_dispatch:

env:
NATS_TOKEN: test

jobs:
test:
name: Run tests
runs-on: ubuntu-latest
steps:
- name: 👀 Checkout code
uses: actions/checkout@v2
with:
submodules: true

- name: Setup apt cache
uses: actions/cache@v2
with:
path: /var/cache/apt/archives
key: ${{ runner.os }}-apt-${{ hashFiles('/etc/apt/sources.list') }}

- name: 😭 Install system dependencies
run: |
sudo apt-get update && sudo apt-get install -y \
netcat \
unzip \
libgeos-dev \
libcurl4-openssl-dev \
libssl-dev \
binutils \
curl \
git \
autoconf \
automake \
build-essential \
libtool \
gcc \
libmagic-dev \
poppler-utils \
tesseract-ocr \
libreoffice \
libpq-dev \
pandoc
- name: 🔽 Download and Install NATS Server
run: |
curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.22/nats-server-v2.10.22-linux-amd64.zip -o nats-server.zip
unzip nats-server.zip -d nats-server && sudo cp nats-server/nats-server-v2.10.22-linux-amd64/nats-server /usr/bin
- name: 🛠️ Set up NATS arguments
run: |
nohup nats-server \
--addr 0.0.0.0 \
--port 4222 \
--auth "$NATS_TOKEN" > nats.log 2>&1 &
- name: 🔍 Verify NATS Server is Running
run: |
sleep 1 # Give the server some time to start
if nc -zv localhost 4222; then
echo "✅ NATS Server is running on port 4222."
else
echo "❌ Failed to start NATS Server."
cat nats.log
exit 1
fi
- name: 🔨 Install the latest version of rye
uses: eifinger/setup-rye@v4
with:
enable-cache: true

- name: 🎯 Cache hit!
if: steps.setup-rye.outputs.cache-hit == 'true'
run: echo "Rye cache was restored"

- name: 🔄 Sync dependencies
run: |
UV_INDEX_STRATEGY=unsafe-first-match rye sync --no-lock
- name: 🚀 Run tests
run: |
rye test -p megaparse-sdk
69 changes: 57 additions & 12 deletions libs/megaparse_sdk/megaparse_sdk/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import asyncio
import enum
import logging
from io import BytesIO
from pathlib import Path
from typing import Any
from types import TracebackType
from typing import Any, Self

import httpx
import nats
from nats.errors import TimeoutError
from nats.errors import NoRespondersError, TimeoutError

from megaparse_sdk.config import ClientNATSConfig, MegaParseConfig
from megaparse_sdk.schema.mp_exceptions import (
DownloadError,
InternalServiceError,
MemoryLimitExceeded,
ModelNotSupported,
ParsingException,
)
Expand Down Expand Up @@ -67,25 +70,65 @@ async def close(self):
await self.session.aclose()


class ClientState(enum.Enum):
# First state of the client
UNOPENED = 1
# Client has either sent a request, or is within a `with` block.
OPENED = 2
# Client has either exited the `with` block, or `close()` called.
CLOSED = 3


class MegaParseNATSClient:
def __init__(self, config: ClientNATSConfig = ClientNATSConfig()):
def __init__(self, config: ClientNATSConfig):
self.nc_config = config
self.max_retries = self.nc_config.max_retries
self.backoff = self.nc_config.backoff
if self.nc_config.ssl_config:
self.ssl_ctx = load_ssl_cxt(self.nc_config.ssl_config)
# Client connection
self._state = ClientState.UNOPENED
self._nc = None

async def _get_nc(self):
if self._nc is None:
self._nc = await nats.connect(
self.nc_config.nats_endpoint, tls=self.ssl_ctx
self.nc_config.endpoint,
tls=self.ssl_ctx,
connect_timeout=self.nc_config.connect_timeout,
reconnect_time_wait=self.nc_config.reconnect_time_wait,
max_reconnect_attempts=self.nc_config.max_reconnect_attempts,
)
return self._nc
return self._nc

async def __aenter__(self: Self) -> Self:
if self._state != ClientState.UNOPENED:
msg = {
ClientState.OPENED: "Cannot open a client instance more than once.",
ClientState.CLOSED: (
"Cannot reopen a client instance, client was closed."
),
}[self._state]
raise RuntimeError(msg)

self._state = ClientState.OPENED

await self._get_nc()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
await self.aclose()

async def parse_url(self, url: str):
url_inp = ParseUrlInput(url=url)
await self._send_req(MPInput(input=url_inp))
return await self._send_req(MPInput(input=url_inp))

async def parse_file(self, file: Path | BytesIO) -> str:
if isinstance(file, Path):
Expand All @@ -108,9 +151,10 @@ async def _send_req(self, inp: MPInput) -> str:
for attempt in range(self.max_retries):
try:
return await self._send_req_inner(inp)
except TimeoutError:
logger.error(f"Timeout error parsing. Retrying {attempt} time")
except (TimeoutError, NoRespondersError) as e:
logger.error(f"Sending req error: {e}. Retrying for {attempt} time")
if attempt < self.max_retries - 1:
logger.debug(f"Backoff for {2**self.backoff}s")
await asyncio.sleep(2**self.backoff)
raise ParsingException

Expand All @@ -122,15 +166,17 @@ async def _send_req_inner(self, inp: MPInput):
timeout=self.nc_config.timeout,
)
response = MPOutput.model_validate_json(raw_response.data.decode("utf-8"))
return self._handle_mp_output(response)

def _handle_mp_output(self, response: MPOutput) -> str:
if response.output_type == MPOutputType.PARSE_OK:
assert response.result, "Parsing OK but response is None"
return response.result
elif response.output_type == MPOutputType.PARSE_ERR:
assert response.err, "Parsing OK but response is None"

match response.err.mp_err_code:
case MPErrorType.MEMORY_LIMIT:
raise ModelNotSupported
raise MemoryLimitExceeded
case MPErrorType.INTERNAL_SERVER_ERROR:
raise InternalServiceError
case MPErrorType.MODEL_NOT_SUPPORTED:
Expand All @@ -139,9 +185,8 @@ async def _send_req_inner(self, inp: MPInput):
raise DownloadError
case MPErrorType.PARSING_ERROR:
raise ParsingException
else:
raise ValueError(f"unknown service response type: {response}")
raise ValueError(f"unknown service response type: {response}")

async def close(self):
async def aclose(self):
nc = await self._get_nc()
await nc.close()
9 changes: 6 additions & 3 deletions libs/megaparse_sdk/megaparse_sdk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class MegaParseConfig(BaseSettings):


class SSLConfig(BaseModel):
ca_cert_file: FilePath
ssl_key_file: FilePath
ssl_cert_file: FilePath
ca_cert_file: FilePath | None = None


class ClientNATSConfig(BaseSettings):
Expand All @@ -28,7 +28,10 @@ class ClientNATSConfig(BaseSettings):
)
subject: Literal["parsing"] = "parsing"
endpoint: str = "https://[email protected]:4222"
timeout: int = 600
timeout: float = 600
max_retries: int = 5
backoff: int = 3
backoff: float = 3
connect_timeout: int = 5
reconnect_time_wait: int = 1
max_reconnect_attempts: int = 20
ssl_config: SSLConfig | None = None
3 changes: 2 additions & 1 deletion libs/megaparse_sdk/megaparse_sdk/utils/load_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

def load_ssl_cxt(ssl_config: SSLConfig):
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(cafile=ssl_config.ca_cert_file)
if ssl_config.ca_cert_file:
context.load_verify_locations(cafile=ssl_config.ca_cert_file)
context.load_cert_chain(
certfile=ssl_config.ssl_cert_file, keyfile=ssl_config.ssl_key_file
)
Expand Down
Empty file.
29 changes: 29 additions & 0 deletions libs/megaparse_sdk/tests/certs/rootCA.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-----BEGIN CERTIFICATE-----
MIIFCzCCA3OgAwIBAgIQESt0eck2KvFrAMyiDyceujANBgkqhkiG9w0BAQsFADCB
nTEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMTkwNwYDVQQLDDBhbWlu
ZUBhbWluZXMtTWFjQm9vay1Qcm8ubG9jYWwgKGFtaW5lIGRpcmhvdXNzaSkxQDA+
BgNVBAMMN21rY2VydCBhbWluZUBhbWluZXMtTWFjQm9vay1Qcm8ubG9jYWwgKGFt
aW5lIGRpcmhvdXNzaSkwHhcNMjQxMTE5MTAwMTA5WhcNMzQxMTE5MTAwMTA5WjCB
nTEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMTkwNwYDVQQLDDBhbWlu
ZUBhbWluZXMtTWFjQm9vay1Qcm8ubG9jYWwgKGFtaW5lIGRpcmhvdXNzaSkxQDA+
BgNVBAMMN21rY2VydCBhbWluZUBhbWluZXMtTWFjQm9vay1Qcm8ubG9jYWwgKGFt
aW5lIGRpcmhvdXNzaSkwggGiMA0GCSqGSIb3DQEBAQUAA4IBjwAwggGKAoIBgQCw
6TX1kvqVMb8ZUQVT/vuDsedmbYgSFn68yJRlmE9BsqG7TLQHl2Kw6VQqZBSIkeZG
CypmUysX/3qrvICeArIdmmsrWUTDYPoauw/a/RY0I07rALj3YR0Y7039Hxf/UPT9
xlUtnM2NafkZyp6WRjEN0N4ETvJDIbUQiosiiPilxhwRbJURhT/JPskaw+OM2Sw5
dFAT20zkYC5VIc4wJBFLAMG0XzI6Sy/4wI1WdRBXd2UMpQU4u7TyD0RB4mnHorV6
kXjtLKD/KWSrSG1nnum9SB9eVatbRD+TUgoclwAKedrlCDEM4EsXVVuUuYCizQNb
+H3BSPfj1upUW5eKfgAyB+8r4QGf2yCY9O8NMMrJ1K5Qv4vSuWAU2tZqAyE8Z4Ke
UtHsl/M0zIvIKwyki2N/rieL/m6lTzS3dwSf9vv7eePEvxd8SBClSF07MUzyxkZ5
UYNxaK5t2ZRADZ6n/9/hAQsMscCkHiX1N2ypBFV+86Pr78BC48JgIyCMwuiBN4sC
AwEAAaNFMEMwDgYDVR0PAQH/BAQDAgIEMBIGA1UdEwEB/wQIMAYBAf8CAQAwHQYD
VR0OBBYEFFdsN4L0DOS2tdn5PNLSV6DP9eJeMA0GCSqGSIb3DQEBCwUAA4IBgQBj
KosfLfW/ZH80NM16pvpyRF3mCi+q+I+P8zrfilMYJBH4EEdEGAUgTO5do1kJXeel
Wky+FNxaP6KCNiT+0amypKg+yjBlnqLKVdnEgR5s12ZfmerV59stx1A/c/bYMEAS
re6xskBkowP2cVQHAC2dy/0Ov+lZsiNaPV2bQx6KUJurveebUQsH3uF3ZEhnUVQ6
rt5+JGY4x9Tr1YMhvHqEDTrsipPdDB1MyW1SnCkqSXrz+DPXGd8BW0O0hpM5la81
J+rfZGinbcUgXM6JMLIHDxLc4Xxzm4NijFzXhbR3XPXqEwsnZOuxcYYFgUGs3FwS
4ro+34a/O4uKS2KV8wsUWj/tWD2rLpduDgag4WSipCvWtaNve8gPdUiyPxUqxyoZ
aFAFg/izXwmRntogJtV0Zvo3fqAaQQDl8t2s21IIx0wmgHzgmkswb5OwFg3dOn/S
lmaH8v7FCBP7jHx/NCPTT5Sy/1EMRATmhFDUZ8Bod/TIlV3e+FCVqlX3kBBRbAU=
-----END CERTIFICATE-----
Loading

0 comments on commit e030285

Please sign in to comment.