Skip to content

Commit

Permalink
feat(AppClient): support timeout and print logs (#273)
Browse files Browse the repository at this point in the history
* feat(fal): support timeout in AppClient

* feat: print AppClient logs
  • Loading branch information
efiop authored Aug 2, 2024
1 parent 6d40a46 commit 5d6799d
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions projects/fal/src/fal/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import inspect
import json
import os
import queue
import re
import threading
import time
import typing
from contextlib import asynccontextmanager, contextmanager
Expand Down Expand Up @@ -72,17 +74,22 @@ def initialize_and_serve():


class EndpointClient:
def __init__(self, url, endpoint, signature):
def __init__(self, url, endpoint, signature, timeout: int | None = None):
self.url = url
self.endpoint = endpoint
self.signature = signature
self.timeout = timeout

annotations = endpoint.__annotations__ or {}
self.return_type = annotations.get("return") or None

def __call__(self, data):
with httpx.Client() as client:
resp = client.post(self.url + self.signature.path, json=dict(data))
resp = client.post(
self.url + self.signature.path,
json=data.dict() if hasattr(data, "dict") else dict(data),
timeout=self.timeout,
)
resp.raise_for_status()
resp_dict = resp.json()

Expand All @@ -93,37 +100,64 @@ def __call__(self, data):


class AppClient:
def __init__(self, cls, url):
def __init__(
self,
cls,
url,
timeout: int | None = None,
):
self.url = url
self.cls = cls

for name, endpoint in inspect.getmembers(cls, inspect.isfunction):
signature = getattr(endpoint, "route_signature", None)
if signature is None:
continue

setattr(self, name, EndpointClient(self.url, endpoint, signature))
endpoint_client = EndpointClient(
self.url,
endpoint,
signature,
timeout=timeout,
)
setattr(self, name, endpoint_client)

@classmethod
@contextmanager
def connect(cls, app_cls):
app = wrap_app(app_cls)
info = app.spawn()
_shutdown_event = threading.Event()

def _print_logs():
while not _shutdown_event.is_set():
try:
log = info.logs.get(timeout=0.1)
except queue.Empty:
continue
print(log)

_log_printer = threading.Thread(target=_print_logs, daemon=True)
_log_printer.start()

try:
with httpx.Client() as client:
retries = 100
while retries:
resp = client.get(info.url + "/health")

if resp.is_success:
break
elif resp.status_code != 500:
resp.raise_for_status()
time.sleep(0.1)
retries -= 1

yield cls(app_cls, info.url)
client = cls(app_cls, info.url)
yield client
finally:
info.stream.cancel()
_shutdown_event.set()
_log_printer.join()

def health(self):
with httpx.Client() as client:
Expand Down

0 comments on commit 5d6799d

Please sign in to comment.