From ffa434e61149f833ee4249fcc7fd3f7fab91d17e Mon Sep 17 00:00:00 2001 From: fab Date: Tue, 20 Aug 2024 16:17:44 +0200 Subject: [PATCH] Update resource_manager.py restored parallel processing --- lxc_autoscale/resource_manager.py | 56 +++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/lxc_autoscale/resource_manager.py b/lxc_autoscale/resource_manager.py index cde53c4..cb5c94a 100644 --- a/lxc_autoscale/resource_manager.py +++ b/lxc_autoscale/resource_manager.py @@ -11,20 +11,24 @@ from scaling_manager import manage_horizontal_scaling, adjust_resources # Import scaling management functions from notification import send_notification # Import notification function from config import HORIZONTAL_SCALING_GROUPS, IGNORE_LXC, DEFAULTS # Import configuration constants +from concurrent.futures import ThreadPoolExecutor, as_completed -def collect_container_data(): +def collect_data_for_container(ctid): """ - Collect resource usage data for all LXC containers. + Collect resource usage data for a single LXC container. + + Args: + ctid (str): The container ID. Returns: - dict: A dictionary where the keys are container IDs and the values are their respective data. + dict: The data collected for the container, or None if the container is not running. """ - containers = {} - for ctid in get_containers(): - if not is_container_running(ctid): - continue - logging.debug(f"Collecting data for container {ctid}...") - + if not is_container_running(ctid): + return None + + logging.debug(f"Collecting data for container {ctid}...") + + try: # Retrieve the current configuration of the container cores = int(run_command(f"pct config {ctid} | grep cores | awk '{{print $2}}'")) memory = int(run_command(f"pct config {ctid} | grep memory | awk '{{print $2}}'")) @@ -34,14 +38,38 @@ def collect_container_data(): backup_container_settings(ctid, settings) # Collect CPU and memory usage data - containers[ctid] = { - "cpu": get_cpu_usage(ctid), - "mem": get_memory_usage(ctid), - "initial_cores": cores, - "initial_memory": memory, + return { + ctid: { + "cpu": get_cpu_usage(ctid), + "mem": get_memory_usage(ctid), + "initial_cores": cores, + "initial_memory": memory, + } } + except Exception as e: + logging.error(f"Error collecting data for container {ctid}: {e}") + return None + +def collect_container_data(): + """ + Collect resource usage data for all LXC containers. + + Returns: + dict: A dictionary where the keys are container IDs and the values are their respective data. + """ + containers = {} + with ThreadPoolExecutor(max_workers=8) as executor: # Adjust max_workers based on your CPU capability + futures = {executor.submit(collect_data_for_container, ctid): ctid for ctid in get_containers()} + for future in as_completed(futures): + container_data = future.result() + if container_data: + containers.update(container_data) return containers + + + + def main_loop(poll_interval, energy_mode): """ Main loop that handles the resource allocation and scaling process.