-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.py
33 lines (27 loc) · 1.07 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from base64 import b64encode
from json import dumps
import aiohttp
import async_timeout
class RestClient:
domain = "api.dataforseo.com"
def __init__(self, username, password):
self.username = username
self.password = password
async def request(self, path, method, data=None):
async with aiohttp.ClientSession() as session:
url = f"https://{self.domain}{path}"
headers = {
'Authorization': f'Basic {b64encode(f"{self.username}:{self.password}".encode()).decode()}',
'Content-Encoding': 'gzip'
}
async with async_timeout.timeout(10):
async with session.request(method, url, headers=headers, data=data) as response:
return await response.json()
async def get_async(self, path):
return await self.request(path, 'GET')
async def post_async(self, path, data):
if isinstance(data, str):
data_str = data
else:
data_str = dumps(data)
return await self.request(path, 'POST', data_str)