-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathapi_client.py
61 lines (48 loc) · 1.72 KB
/
api_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import dataclasses
import json
from asyncio import StreamReader, StreamWriter
from typing import Any
from pvp_ml.api import Request, Response
class ApiClient:
def __init__(self, host: str = "localhost", port: int = 9999):
self._host = host
self._port = port
self._reader: StreamReader | None = None
self._writer: StreamWriter | None = None
async def send_request(self, request: Request) -> Response:
if self._writer is None:
await self._connect()
assert self._writer is not None
assert self._reader is not None
try:
request_json = json.dumps(dataclasses.asdict(request))
self._writer.write((request_json + "\n").encode())
await self._writer.drain()
response_json = await self._reader.readline()
if not response_json:
raise IOError
response_dict = json.loads(response_json.decode())
return Response(**response_dict)
except Exception as e:
await self.close()
raise e
async def close(self) -> None:
if self._writer is not None:
self._writer.close()
try:
await self._writer.wait_closed()
except ConnectionError:
pass
self._reader = None
self._writer = None
async def _connect(self) -> None:
self._reader, self._writer = await asyncio.open_connection(
self._host, self._port
)
async def __aenter__(self) -> "ApiClient":
if self._writer is None:
await self._connect()
return self
async def __aexit__(self, *args: Any) -> None:
await self.close()