Skip to content

Commit

Permalink
non blocking monitor (#131)
Browse files Browse the repository at this point in the history
* non blocking monitor

* Fix bug where .popleft was called on list object

---------

Co-authored-by: Manveer <[email protected]>
  • Loading branch information
samsja and manveerxyz authored Oct 24, 2024
1 parent eaf6486 commit f4b7c85
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions src/zeroband/utils/monitor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import deque
from typing import Any
from zeroband.utils.logging import get_logger
import aiohttp
Expand All @@ -9,7 +10,7 @@ async def _get_external_ip(max_retries=3, retry_delay=5):
async with aiohttp.ClientSession() as session:
for attempt in range(max_retries):
try:
async with session.get('https://api.ipify.org', timeout=10) as response:
async with session.get("https://api.ipify.org", timeout=10) as response:
response.raise_for_status()
return await response.text()
except ClientError:
Expand Down Expand Up @@ -41,8 +42,7 @@ def __init__(self, config, *args, **kwargs):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def __del__(self):
self.loop.close()
self._pending_tasks = deque()

def _remove_duplicates(self):
seen = set()
Expand All @@ -68,9 +68,33 @@ def log(self, data: dict[str, Any]):

self._handle_send_batch()

def __del__(self):
# Ensure all pending tasks are completed before closing
if hasattr(self, "loop") and self.loop is not None:
try:
pending = asyncio.all_tasks(self.loop)
self.loop.run_until_complete(asyncio.gather(*pending))
except Exception as e:
self._logger.error(f"Error cleaning up pending tasks: {str(e)}")
finally:
self.loop.close()

def _cleanup_completed_tasks(self):
"""Remove completed tasks from the pending tasks queue"""
while self._pending_tasks and self._pending_tasks[0].done():
task = self._pending_tasks.popleft()
try:
task.result() # This will raise any exceptions that occurred
except Exception as e:
self._logger.error(f"Error in completed batch send task: {str(e)}")

def _handle_send_batch(self, flush: bool = False):
self._cleanup_completed_tasks()

if len(self.data) >= self.log_flush_interval or flush:
self.loop.run_until_complete(self._send_batch())
# Create a new task for sending the batch
task = self.loop.create_task(self._send_batch())
self._pending_tasks.append(task)

async def _set_node_ip_address(self):
if self.node_ip_address is None and self.node_ip_address_fetch_status != "failed":
Expand All @@ -89,16 +113,11 @@ async def _send_batch(self):
self._remove_duplicates()
await self._set_node_ip_address()

batch = self.data[:self.log_flush_interval]
batch = self.data[: self.log_flush_interval]
# set node_ip_address of batch
batch = [{**log, "node_ip_address": self.node_ip_address} for log in batch]
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.auth_token}"
}
payload = {
"logs": batch
}
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.auth_token}"}
payload = {"logs": batch}
api = f"{self.base_url}/metrics/{self.run_id}/logs"

try:
Expand Down

0 comments on commit f4b7c85

Please sign in to comment.