From 7c6a47e3636d6c192657260247b215b96a122384 Mon Sep 17 00:00:00 2001 From: hdoupe Date: Sun, 19 Apr 2020 13:21:26 -0400 Subject: [PATCH 01/55] Refactor compute cluster logic to prepare for automatic updates/publishing --- .gitignore | 5 +- distributed/api/celery_app/__init__.py | 148 -------- distributed/api/endpoints.py | 73 +--- distributed/api/tests/test_celery.py | 15 - distributed/celery_io.sh | 4 - distributed/cs-dask-sim/cs_dask_sim.py | 18 +- distributed/cs_cluster.py | 343 +++--------------- distributed/dockerfiles/Dockerfile | 16 +- distributed/dockerfiles/Dockerfile.celerybase | 5 - distributed/dockerfiles/Dockerfile.flask | 19 +- .../dockerfiles/Dockerfile.outputs_processor | 24 ++ distributed/dockerfiles/Dockerfile.tasks | 51 --- distributed/kubernetes/flower-deployment.yaml | 40 -- distributed/outputs_processor.py | 57 +++ distributed/requirements.txt | 4 +- distributed/tasks_writer.py | 48 --- .../dask/scheduler-deployment.template.yaml | 41 --- .../dask/scheduler-service.template.yaml | 18 - .../dask/worker-deployment.template.yaml | 45 --- .../templates/flask-deployment.template.yaml | 24 +- ...outputs-processor-deployment.template.yaml | 39 ++ .../templates/sc-deployment.template.yaml | 45 --- distributed/templates/secret.template.yaml | 11 + distributed/templates/tasks_template.py | 58 --- distributed/worker_config.dev.yaml | 37 -- 25 files changed, 213 insertions(+), 975 deletions(-) delete mode 100644 distributed/api/celery_app/__init__.py delete mode 100644 distributed/api/tests/test_celery.py delete mode 100755 distributed/celery_io.sh delete mode 100755 distributed/dockerfiles/Dockerfile.celerybase create mode 100755 distributed/dockerfiles/Dockerfile.outputs_processor delete mode 100755 distributed/dockerfiles/Dockerfile.tasks delete mode 100644 distributed/kubernetes/flower-deployment.yaml create mode 100644 distributed/outputs_processor.py delete mode 100644 distributed/tasks_writer.py delete mode 100644 distributed/templates/dask/scheduler-deployment.template.yaml delete mode 100644 distributed/templates/dask/scheduler-service.template.yaml delete mode 100644 distributed/templates/dask/worker-deployment.template.yaml create mode 100755 distributed/templates/outputs-processor-deployment.template.yaml delete mode 100755 distributed/templates/sc-deployment.template.yaml create mode 100644 distributed/templates/secret.template.yaml delete mode 100644 distributed/templates/tasks_template.py delete mode 100644 distributed/worker_config.dev.yaml diff --git a/.gitignore b/.gitignore index 46a54b99..f9d5ba31 100755 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ secret-docker-compose.yml *docker-compose-apps* distributed/kubernetes/apps/* -dev-secret.yaml +secret.yaml -*flask-deployment.yaml \ No newline at end of file +*flask-deployment.yaml +*outputs-processor-deployment.yaml \ No newline at end of file diff --git a/distributed/api/celery_app/__init__.py b/distributed/api/celery_app/__init__.py deleted file mode 100644 index 1c2c7882..00000000 --- a/distributed/api/celery_app/__init__.py +++ /dev/null @@ -1,148 +0,0 @@ -import os -import time -import functools -import re -import traceback - -import requests -from celery import Celery -from celery.signals import task_postrun -from celery.result import AsyncResult - -import cs_storage - - -try: - from cs_config import functions -except ImportError as ie: - if os.environ.get("IS_FLASK", "False") == "True": - functions = None - else: - raise ie - - -COMP_URL = os.environ.get("COMP_URL") -COMP_API_TOKEN = os.environ.get("COMP_API_TOKEN") - -CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") -CELERY_RESULT_BACKEND = os.environ.get( - "CELERY_RESULT_BACKEND", "redis://localhost:6379" -) - -OUTPUTS_VERSION = os.environ.get("OUTPUTS_VERSION") - - -def get_task_routes(): - def clean(name): - return re.sub("[^0-9a-zA-Z]+", "", name).lower() - - print(f"getting config from: {COMP_URL}/publish/api/") - resp = requests.get(f"{COMP_URL}/publish/api/") - if resp.status_code != 200: - raise Exception(f"Response status code: {resp.status_code}") - data = resp.json() - task_routes = {} - for project in data: - owner = clean(project["owner"]) - title = clean(project["title"]) - model = f"{owner}_{title}" - - # all apps use celery workers for handling their inputs. - routes = { - f"{model}_tasks.inputs_get": {"queue": f"{model}_inputs_queue"}, - f"{model}_tasks.inputs_parse": {"queue": f"{model}_inputs_queue"}, - f"{model}_tasks.inputs_version": {"queue": f"{model}_inputs_queue"}, - } - - # only add sim routes for models that use celery workers. - if project["cluster_type"] == "single-core": - routes[f"{model}_tasks.sim"] = {"queue": f"{model}_queue"} - - task_routes.update(routes) - return task_routes - - -task_routes = get_task_routes() - - -celery_app = Celery( - "celery_app", broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND -) -celery_app.conf.update( - task_serializer="json", - accept_content=["msgpack", "json"], - task_routes=task_routes, - worker_prefetch_multiplier=1, - task_acks_late=True, -) - - -def task_wrapper(func): - @functools.wraps(func) - def f(*args, **kwargs): - task = args[0] - task_id = task.request.id - start = time.time() - traceback_str = None - res = {} - try: - outputs = func(*args, **kwargs) - if task.name.endswith("sim"): - version = outputs.pop("version", OUTPUTS_VERSION) - if version == "v0": - res["model_version"] = "NA" - res.update(dict(outputs, **{"version": version})) - else: - outputs = cs_storage.write(task_id, outputs) - res.update( - { - "model_version": functions.get_version(), - "outputs": outputs, - "version": version, - } - ) - else: - res.update(outputs) - except Exception: - traceback_str = traceback.format_exc() - finish = time.time() - if "meta" not in res: - res["meta"] = {} - res["meta"]["task_times"] = [finish - start] - if traceback_str is None: - res["status"] = "SUCCESS" - else: - res["status"] = "FAIL" - res["traceback"] = traceback_str - return res - - return f - - -@task_postrun.connect -def post_results(sender=None, headers=None, body=None, **kwargs): - print(f'task_id: {kwargs["task_id"]}') - print(f'task: {kwargs["task"]} {kwargs["task"].name}') - print(f'is sim: {kwargs["task"].name.endswith("sim")}') - print(f'state: {kwargs["state"]}') - kwargs["retval"]["job_id"] = kwargs["task_id"] - if kwargs["task"].name.endswith("sim"): - print(f"posting data to {COMP_URL}/outputs/api/") - resp = requests.put( - f"{COMP_URL}/outputs/api/", - json=kwargs["retval"], - headers={"Authorization": f"Token {COMP_API_TOKEN}"}, - ) - print("resp", resp.status_code) - if resp.status_code == 400: - print("errors", resp.json()) - if kwargs["task"].name.endswith("parse"): - print(f"posting data to {COMP_URL}/inputs/api/") - resp = requests.put( - f"{COMP_URL}/inputs/api/", - json=kwargs["retval"], - headers={"Authorization": f"Token {COMP_API_TOKEN}"}, - ) - print("resp", resp.status_code) - if resp.status_code == 400: - print("errors", resp.json()) diff --git a/distributed/api/endpoints.py b/distributed/api/endpoints.py index 206fc96a..7c183cbf 100644 --- a/distributed/api/endpoints.py +++ b/distributed/api/endpoints.py @@ -10,16 +10,14 @@ from flask import Blueprint, request, make_response from celery.result import AsyncResult from celery import chord -from distributed import Client, Future, fire_and_forget import redis import requests -from api.celery_app import celery_app -from cs_dask_sim import dask_sim, done_callback +from cs_publish.app import app as celery_app -COMP_URL = os.environ.get("COMP_URL") -COMP_API_TOKEN = os.environ.get("COMP_API_TOKEN") +CS_URL = os.environ.get("CS_URL") +CS_API_TOKEN = os.environ.get("CS_API_TOKEN") bp = Blueprint("endpoints", __name__) @@ -34,8 +32,8 @@ def clean(word): def get_cs_config(): - print(f"getting config from: {COMP_URL}/publish/api/") - resp = requests.get(f"{COMP_URL}/publish/api/") + print(f"getting config from: {CS_URL}/publish/api/") + resp = requests.get(f"{CS_URL}/publish/api/") if resp.status_code != 200: raise Exception(f"Response status code: {resp.status_code}") data = resp.json() @@ -66,11 +64,6 @@ def get_time_out(owner, app_name): return CONFIG[model_id]["time_out"] -def dask_scheduler_address(owner, app_name): - owner, app_name = clean(owner), clean(app_name) - return f"{owner}-{app_name}-dask-scheduler:8786" - - def async_endpoint(owner, app_name, compute_task): print(f"async endpoint {compute_task}") data = request.get_data() @@ -97,37 +90,6 @@ def sync_endpoint(owner, app_name, compute_task): return json.dumps(result) -def dask_endpoint(owner, app_name, action): - """ - Route dask simulation to appropriate dask scheduluer. - """ - print(f"dask endpoint: {owner}/{app_name}/{action}") - data = request.get_data() - inputs = json.loads(data) - print("inputs", inputs) - addr = dask_scheduler_address(owner, app_name) - job_id = str(uuid.uuid4()) - - # Worker needs the job_id to push the results back to the - # webapp. - # The url and api token are passed as args insted of env - # variables so that the wrapper has access to them - # but the model does not. - inputs.update( - { - "job_id": job_id, - "comp_url": os.environ.get("COMP_URL"), - "comp_api_token": os.environ.get("COMP_API_TOKEN"), - "timeout": get_time_out(owner, app_name), - } - ) - - with Client(addr) as c: - fut = c.submit(dask_sim, **inputs) - fire_and_forget(fut) - return {"job_id": job_id, "qlength": 1} - - def route_to_task(owner, app_name, endpoint, action): owner, app_name = clean(owner), clean(app_name) print("getting...", owner, app_name, endpoint, action) @@ -169,8 +131,6 @@ def endpoint_sim(owner, app_name): print(f"cluster type is {cluster_type}") if cluster_type == "single-core": return route_to_task(owner, app_name, async_endpoint, action) - elif cluster_type == "dask": - return dask_endpoint(owner, app_name, action) else: return json.dumps({"error": "model does not exist."}), 404 @@ -189,18 +149,6 @@ def results(owner, app_name, job_id): ) else: return make_response("not ready", 202) - elif cluster_type == "dask": - addr = dask_scheduler_address(owner, app_name) - with Client(addr) as client: - fut = Future(job_id, client=client) - if fut.done() and fut.status != "error": - return fut.result() - elif fut.done() and fut.status in ("error", "cancelled"): - return json.dumps( - {"status": "WORKER_FAILURE", "traceback": fut.traceback()} - ) - else: - return make_response("not ready", 202) else: return json.dumps({"error": "model does not exist."}), 404 @@ -218,17 +166,6 @@ def query_results(owner, app_name, job_id): return "FAIL" else: return "NO" - elif cluster_type == "dask": - addr = dask_scheduler_address(owner, app_name) - with Client(addr) as client: - fut = Future(job_id, client=client) - print("dask result", fut.status) - if fut.done() and fut.status != "error": - return "YES" - elif fut.done() and fut.status in ("error", "cancelled"): - return "FAIL" - else: - return "NO" else: return json.dumps({"error": "model does not exist."}), 404 diff --git a/distributed/api/tests/test_celery.py b/distributed/api/tests/test_celery.py deleted file mode 100644 index 42b6f298..00000000 --- a/distributed/api/tests/test_celery.py +++ /dev/null @@ -1,15 +0,0 @@ -# import pytest -# from celery import chord - -# @pytest.fixture(scope='session') -# def celery_config(): -# return { -# 'broker_url': 'redis://redis:6379/0', -# 'result_backend': 'redis://redis:6379/0', -# 'task_serializer': 'json', -# 'accept_content': ['msgpack', 'json']} - - -# def test_project_endpoint(celery_worker): -# # celery tests here. -# pass diff --git a/distributed/celery_io.sh b/distributed/celery_io.sh deleted file mode 100755 index 02d308d6..00000000 --- a/distributed/celery_io.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash -SAFEOWNER=$(python -c "import re, os; print(re.sub('[^0-9a-zA-Z]+', '', \"$1\").lower())") -SAFETITLE=$(python -c "import re, os; print(re.sub('[^0-9a-zA-Z]+', '', \"$2\").lower())") -celery -A celery_app.${SAFEOWNER}_${SAFETITLE}_tasks worker --loglevel=info --concurrency=1 -Q ${SAFEOWNER}_${SAFETITLE}_inputs_queue -n ${SAFEOWNER}_${SAFETITLE}_inputs@%h \ No newline at end of file diff --git a/distributed/cs-dask-sim/cs_dask_sim.py b/distributed/cs-dask-sim/cs_dask_sim.py index 2f60ff35..223a6ff6 100644 --- a/distributed/cs-dask-sim/cs_dask_sim.py +++ b/distributed/cs-dask-sim/cs_dask_sim.py @@ -13,15 +13,15 @@ functions = None -def done_callback(future, job_id, comp_url, comp_api_token, start_time): +def done_callback(future, job_id, cs_url, cs_api_token, start_time): """ This should be called like: callback = functools.partial( done_callback, job_id=job_id, - comp_url=os.environ.get("COMP_URL"), - comp_api_token=os.environ.get("comp_api_token"), + cs_url=os.environ.get("CS_URL"), + cs_api_token=os.environ.get("CS_API_TOKEN"), start_time=time.time() ) @@ -61,18 +61,18 @@ def done_callback(future, job_id, comp_url, comp_api_token, start_time): res["job_id"] = job_id print("got result", res) - print(f"posting data to {comp_url}/outputs/api/") + print(f"posting data to {cs_url}/outputs/api/") resp = requests.put( - f"{comp_url}/outputs/api/", + f"{cs_url}/outputs/api/", json=res, - headers={"Authorization": f"Token {comp_api_token}"}, + headers={"Authorization": f"Token {cs_api_token}"}, ) print("resp", resp.status_code) if resp.status_code == 400: print("errors", resp.json()) -def dask_sim(meta_param_dict, adjustment, job_id, comp_url, comp_api_token, timeout): +def dask_sim(meta_param_dict, adjustment, job_id, cs_url, cs_api_token, timeout): """ Wraps the functions.run_model function with a dask future and adds a callback for pushing the results back to the webapp. The callback is @@ -88,8 +88,8 @@ def dask_sim(meta_param_dict, adjustment, job_id, comp_url, comp_api_token, time partialled_cb = partial( done_callback, job_id=job_id, - comp_url=comp_url, - comp_api_token=comp_api_token, + cs_url=cs_url, + cs_api_token=cs_api_token, start_time=start_time, ) with worker_client() as c: diff --git a/distributed/cs_cluster.py b/distributed/cs_cluster.py index fff26a60..0d1293e7 100644 --- a/distributed/cs_cluster.py +++ b/distributed/cs_cluster.py @@ -48,55 +48,22 @@ class Cluster: """ k8s_target = "kubernetes/" - k8s_app_target = "kubernetes/apps" cr = "gcr.io" - def __init__(self, config, tag, project, models=None): + def __init__(self, tag, project): self.tag = tag self.project = project - self.models = models if models and models[0] else None - - with open(config, "r") as f: - self.config = yaml.safe_load(f.read()) with open("templates/flask-deployment.template.yaml", "r") as f: self.flask_template = yaml.safe_load(f.read()) - with open("templates/sc-deployment.template.yaml", "r") as f: - self.sc_template = yaml.safe_load(f.read()) - - with open("templates/dask/scheduler-deployment.template.yaml", "r") as f: - self.dask_scheduler_template = yaml.safe_load(f.read()) - - with open("templates/dask/scheduler-service.template.yaml", "r") as f: - self.dask_scheduler_service_template = yaml.safe_load(f.read()) + with open("templates/outputs-processor-deployment.template.yaml", "r") as f: + self.outputs_processor_template = yaml.safe_load(f.read()) - with open("templates/dask/worker-deployment.template.yaml", "r") as f: - self.dask_worker_template = yaml.safe_load(f.read()) + with open("templates/secret.template.yaml", "r") as f: + self.secret_template = yaml.safe_load(f.read()) def build(self): - """ - Wrap all methods that build, tag, and push the images as well as - write the k8s config fiels. - """ - self.build_base_images() - self.write_flask_deployment() - self.build_apps() - - def apply(self): - """ - Experimental. Apply k8s config files to existing k8s cluster. - """ - run(f"kubectl apply -f {self.k8s_target}") - run(f"kubectl apply -f {self.k8s_app_target}") - - def dry_run(self): - self.write_flask_deployment() - for app in self.config: - for action in ["io", "sim"]: - self.write_app_deployment(app, action) - - def build_base_images(self): """ Build, tag, and push base images for the flask app and modeling apps. @@ -104,297 +71,85 @@ def build_base_images(self): pull from either distributed:latest or celerybase:latest. """ run("docker build -t distributed:latest -f dockerfiles/Dockerfile ./") - run("docker build -t celerybase:latest -f dockerfiles/Dockerfile.celerybase ./") + run( + f"docker build -t outputs_processor:{self.tag} -f dockerfiles/Dockerfile.outputs_processor ./" + ) run(f"docker build -t flask:{self.tag} -f dockerfiles/Dockerfile.flask ./") - for img_name in ["distributed", "celerybase"]: - run(f"docker tag {img_name} {self.cr}/{self.project}/{img_name}:latest") - run(f"docker push {self.cr}/{self.project}/{img_name}:latest") + run(f"docker tag distributed {self.cr}/{self.project}/distributed:latest") + run(f"docker push {self.cr}/{self.project}/distributed:latest") + + run( + f"docker tag outputs_processor:{self.tag} {self.cr}/{self.project}/outputs_processor:{self.tag}" + ) + run(f"docker push {self.cr}/{self.project}/outputs_processor:{self.tag}") run(f"docker tag flask:{self.tag} {self.cr}/{self.project}/flask:{self.tag}") run(f"docker push {self.cr}/{self.project}/flask:{self.tag}") + def make_config(self): + self.write_flask_deployment() + self.write_outputs_processor_deployment() + self.write_secret() + def write_flask_deployment(self): """ Write flask deployment file. Only step is filling in the image uri. """ - flask_deployment = copy.deepcopy(self.flask_template) - flask_deployment["spec"]["template"]["spec"]["containers"][0][ + deployment = copy.deepcopy(self.flask_template) + deployment["spec"]["template"]["spec"]["containers"][0][ "image" ] = f"gcr.io/{self.project}/flask:{self.tag}" with open(f"{self.k8s_target}/flask-deployment.yaml", "w") as f: - f.write(yaml.dump(flask_deployment)) + f.write(yaml.dump(deployment)) - return flask_deployment + return deployment - def build_apps(self): - """ - Build, tag, and push images and write k8s config files - for all apps in config. Filters out those not in models - list, if applicable. - """ - # ensure clean path. - path = Path(self.k8s_app_target) - path.mkdir(exist_ok=True) - stale_files = path.glob("*yaml") - _ = [sf.unlink() for sf in stale_files] - - for app in self.config: - if self.models and app["title"] not in self.models[0]: - continue - try: - self.build_app_image(app) - except Exception as e: - print( - f"There was an error building: " - f"{app['title']}/{app['owner']}:{self.tag}" - ) - print(e) - continue - - for action in ["io", "sim"]: - self.write_app_deployment(app, action) - - def build_app_image(self, app): + def write_outputs_processor_deployment(self): """ - Build, tag, and pus the image for a single app. + Write outputs processor deployment file. Only step is filling + in the image uri. """ - safeowner = clean(app["owner"]) - safetitle = clean(app["title"]) - img_name = f"{safeowner}_{safetitle}_tasks" - - reg_url = "https://github.com" - raw_url = "https://raw.githubusercontent.com" - - buildargs = dict( - OWNER=app["owner"], - TITLE=app["title"], - BRANCH=app["branch"], - SAFEOWNER=safeowner, - SAFETITLE=safetitle, - SIM_TIME_LIMIT=app["sim_time_limit"], - REPO_URL=app["repo_url"], - RAW_REPO_URL=app["repo_url"].replace(reg_url, raw_url), - **app["env"], - ) + deployment = copy.deepcopy(self.outputs_processor_template) + deployment["spec"]["template"]["spec"]["containers"][0][ + "image" + ] = f"gcr.io/{self.project}/outputs_processor:{self.tag}" - buildargs_str = " ".join( - [f"--build-arg {arg}={value}" for arg, value in buildargs.items()] - ) - cmd = ( - f"docker build {buildargs_str} -t {img_name}:{self.tag} " - f"-f dockerfiles/Dockerfile.tasks ./" - ) - run(cmd) + with open(f"{self.k8s_target}/outputs-processor-deployment.yaml", "w") as f: + f.write(yaml.dump(deployment)) - run( - f"docker tag {img_name}:{self.tag} {self.cr}/{self.project}/{img_name}:{self.tag}" - ) - run(f"docker push {self.cr}/{self.project}/{img_name}:{self.tag}") + return deployment - def write_app_deployment(self, app, action): - """ - Write k8s config file for an app. + def write_secret(self): + secrets = copy.deepcopy(self.secret_template) + secrets["stringData"]["CS_API_TOKEN"] = self._get_secret("CS_API_TOKEN") - Note: Dask uses a dot notation for specifying paths - in their config. It could be helpful for us to - do that, too. + with open(f"{self.k8s_target}/secret.yaml", "w") as f: + f.write(yaml.dump(secrets)) - Also, all io (inputs) apps are deployed as a - single-core cluster. - """ - if action == "io": - self.write_sc_app(app, action) - elif app["cluster_type"] == "dask": - self.write_dask_app(app, action) - elif app["cluster_type"] == "single-core": - self.write_sc_app(app, action) - else: - raise RuntimeError(f"Cluster type {app['cluster_type']} unknown.") - - def write_dask_app(self, app, action): - self._write_dask_worker_app(app) - self._write_dask_scheduler_app(app) - self._write_dask_scheduler_service(app) - - def _write_dask_worker_app(self, app): - app_deployment = copy.deepcopy(self.dask_worker_template) - safeowner = clean(app["owner"]) - safetitle = clean(app["title"]) - name = f"{safeowner}-{safetitle}-dask-worker" - image = f"{self.cr}/{self.project}/{safeowner}_{safetitle}_tasks:{self.tag}" - - app_deployment["metadata"]["name"] = name - app_deployment["metadata"]["labels"]["app"] = name - app_deployment["spec"]["replicas"] = app.get("replicas", 1) - app_deployment["spec"]["selector"]["matchLabels"]["app"] = name - app_deployment["spec"]["template"]["metadata"]["labels"]["app"] = name - if "affinity" in app: - affinity_exp = {"key": "model", "operator": "In", "values": [app["affinity"]["model"]]} - app_deployment["spec"]["template"]["spec"]["affinity"] = { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [{"matchExpressions": [affinity_exp]}] - } - } - } - - container_config = app_deployment["spec"]["template"]["spec"]["containers"][0] - - resources, _ = self._resources(app, action="sim") - container_config.update( - { - "name": name, - "image": image, - "args": [ - "dask-worker", - f"{safeowner}-{safetitle}-dask-scheduler:8786", - "--nthreads", - str(resources["limits"]["cpu"]), - "--memory-limit", - str(resources["limits"]["memory"]), - "--no-bokeh", - ], - "resources": resources, - } - ) - container_config["env"].append( - { - "name": "DASK_SCHEDULER_ADDRESS", - "value": f"{safeowner}-{safetitle}-dask-scheduler:8786", - } - ) + def _get_secret(self, secret_name): + from google.cloud import secretmanager - self._set_secrets(app, container_config) - - with open(f"{self.k8s_app_target}/{name}-deployment.yaml", "w") as f: - f.write(yaml.dump(app_deployment)) - - return app_deployment - - def _write_dask_scheduler_app(self, app): - app_deployment = copy.deepcopy(self.dask_scheduler_template) - safeowner = clean(app["owner"]) - safetitle = clean(app["title"]) - name = f"{safeowner}-{safetitle}-dask-scheduler" - - app_deployment["metadata"]["name"] = name - app_deployment["metadata"]["labels"]["app"] = name - app_deployment["spec"]["selector"]["matchLabels"]["app"] = name - app_deployment["spec"]["template"]["metadata"]["labels"]["app"] = name - app_deployment["spec"]["template"]["spec"]["containers"][0]["name"] = name - - with open(f"{self.k8s_app_target}/{name}-deployment.yaml", "w") as f: - f.write(yaml.dump(app_deployment)) - - return app_deployment - - def _write_dask_scheduler_service(self, app): - app_service = copy.deepcopy(self.dask_scheduler_service_template) - safeowner = clean(app["owner"]) - safetitle = clean(app["title"]) - name = f"{safeowner}-{safetitle}-dask-scheduler" - - app_service["metadata"]["name"] = name - app_service["metadata"]["labels"]["app"] = name - app_service["spec"]["selector"]["app"] = name - - app_service["spec"]["ports"][0]["name"] = name - app_service["spec"]["ports"][1]["name"] = f"{safeowner}-{safetitle}-dask-webui" - - with open(f"{self.k8s_app_target}/{name}-service.yaml", "w") as f: - f.write(yaml.dump(app_service)) - - return app_service - - def write_sc_app(self, app, action): - app_deployment = copy.deepcopy(self.sc_template) - safeowner = clean(app["owner"]) - safetitle = clean(app["title"]) - name = f"{safeowner}-{safetitle}-{action}" - - resources, affinity_size = self._resources(app, action) - - if not isinstance(affinity_size, list): - affinity_size = [affinity_size] - - app_deployment["metadata"]["name"] = name - app_deployment["spec"]["selector"]["matchLabels"]["app"] = name - app_deployment["spec"]["template"]["metadata"]["labels"]["app"] = name - if "affinity" in app and action == "sim": - affinity_exp = {"key": "size", "operator": "In", "values": affinity_size} - app_deployment["spec"]["template"]["spec"]["affinity"] = { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [{"matchExpressions": [affinity_exp]}] - } - } - } - - container_config = app_deployment["spec"]["template"]["spec"]["containers"][0] - - container_config.update( - { - "name": name, - "image": f"{self.cr}/{self.project}/{safeowner}_{safetitle}_tasks:{self.tag}", - "command": [f"./celery_{action}.sh"], - "args": [ - app["owner"], - app["title"], - ], # TODO: pass safe names to docker file at build and run time - "resources": resources, - } + client = secretmanager.SecretManagerServiceClient() + response = client.access_secret_version( + f"projects/{self.project}/secrets/{secret_name}/versions/latest" ) - container_config["env"].append({"name": "TITLE", "value": app["title"]}) - container_config["env"].append({"name": "OWNER", "value": app["owner"]}) - - self._set_secrets(app, container_config) - - with open(f"{self.k8s_app_target}/{name}-deployment.yaml", "w") as f: - f.write(yaml.dump(app_deployment)) - - return app_deployment - - def _resources(self, app, action): - if action == "io": - resources = { - "requests": {"cpu": 0.7, "memory": "0.25G"}, - "limits": {"cpu": 1, "memory": "0.7G"}, - } - affinity_size = ["small", "medium"] - else: - resources = {"requests": {"memory": "1G", "cpu": 1}} - resources = dict(resources, **copy.deepcopy(app["resources"])) - affinity_size = app.get("affinity", {}).get("size", ["small", "medium"]) - return resources, affinity_size - - def _set_secrets(self, app, config): - # TODO: write secrets to secret config files instead of env. - if app.get("secret"): - for var, val in app["secret"].items(): - config["env"].append({"name": var.upper(), "value": val}) + return response.payload.data.decode("utf-8") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Deploy C/S compute cluster.") - parser.add_argument("--config", required=True) parser.add_argument("--tag", required=False, default=TAG) parser.add_argument("--project", required=False, default=PROJECT) - parser.add_argument("--models", nargs="+", type=str, required=False, default=None) parser.add_argument("--build", action="store_true") - parser.add_argument("--dry-run", action="store_true") - parser.add_argument("--build-base-only", action="store_true") + parser.add_argument("--make-config", action="store_true") args = parser.parse_args() - cluster = Cluster( - config=args.config, tag=args.tag, project=args.project, models=args.models - ) + cluster = Cluster(tag=args.tag, project=args.project) if args.build: cluster.build() - elif args.dry_run: - cluster.dry_run() - elif args.build_base_only: - cluster.build_base_images() + if args.make_config: + cluster.make_config() diff --git a/distributed/dockerfiles/Dockerfile b/distributed/dockerfiles/Dockerfile index b165247f..9b62226e 100755 --- a/distributed/dockerfiles/Dockerfile +++ b/distributed/dockerfiles/Dockerfile @@ -3,22 +3,10 @@ FROM continuumio/miniconda3 USER root RUN apt-get update && apt install libgl1-mesa-glx --yes -# install packages for chromium -RUN apt-get update && \ - apt-get install -yq --no-install-recommends \ - libasound2 libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 \ - libexpat1 libfontconfig1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 libglib2.0-0 libgtk-3-0 libnspr4 \ - libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 \ - libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 \ - libnss3 - -RUN mkdir /home/distributed -RUN mkdir /home/distributed/api - RUN conda update conda RUN conda config --append channels conda-forge RUN conda install "python>=3.7" pip -COPY requirements.txt home/distributed +COPY requirements.txt /home -WORKDIR /home/distributed \ No newline at end of file +WORKDIR /home \ No newline at end of file diff --git a/distributed/dockerfiles/Dockerfile.celerybase b/distributed/dockerfiles/Dockerfile.celerybase deleted file mode 100755 index 58d26d1f..00000000 --- a/distributed/dockerfiles/Dockerfile.celerybase +++ /dev/null @@ -1,5 +0,0 @@ -ARG TAG -FROM distributed - -ENV CELERY_BROKER_URL redis://redis-master/0 -ENV CELERY_RESULT_BACKEND redis://redis-master/0 diff --git a/distributed/dockerfiles/Dockerfile.flask b/distributed/dockerfiles/Dockerfile.flask index 4c567fb9..ff88b93b 100755 --- a/distributed/dockerfiles/Dockerfile.flask +++ b/distributed/dockerfiles/Dockerfile.flask @@ -1,10 +1,6 @@ ARG TAG FROM distributed -LABEL build="flask" date="2018-06-13" - -USER root - ENV CELERY_BROKER_URL redis://redis-master/0 ENV CELERY_RESULT_BACKEND redis://redis-master/0 @@ -18,19 +14,14 @@ ENV IS_FLASK True EXPOSE 80 EXPOSE 5050 -RUN conda install -c conda-forge dask distributed RUN pip install -r requirements.txt -COPY ./cs-dask-sim /home/distributed/cs-dask-sim -RUN cd /home/distributed/cs-dask-sim && pip install -e . - -COPY ./api /home/distributed/api -COPY ./api/endpoints.py /home/distributed/api -COPY ./api/celery_app/__init__.py /home/distributed/api/celery_app/__init__.py -COPY ./setup.py /home/distributed -RUN cd /home/distributed && pip install -e . +COPY ./api /home/api +COPY ./api/endpoints.py /home/api +COPY ./setup.py /home +RUN cd /home && pip install -e . -WORKDIR /home/distributed/api +WORKDIR /home/api # run the app server CMD ["gunicorn", "--bind", "0.0.0.0:5050", "api:app", "--access-logfile", "-"] diff --git a/distributed/dockerfiles/Dockerfile.outputs_processor b/distributed/dockerfiles/Dockerfile.outputs_processor new file mode 100755 index 00000000..5a0e8e6d --- /dev/null +++ b/distributed/dockerfiles/Dockerfile.outputs_processor @@ -0,0 +1,24 @@ +ARG TAG +FROM distributed + +# install packages for chromium +RUN apt-get update && \ + apt-get install -yq --no-install-recommends \ + libasound2 libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 \ + libexpat1 libfontconfig1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 libglib2.0-0 libgtk-3-0 libnspr4 \ + libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 \ + libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 \ + libnss3 + + +RUN pip install -r requirements.txt +RUN conda install -c conda-forge jinja2 bokeh pyppeteer "pyee<6" && pyppeteer-install + +ENV CELERY_BROKER_URL redis://redis-master/0 +ENV CELERY_RESULT_BACKEND redis://redis-master/0 + +COPY outputs_processor.py /home + +WORKDIR /home + +CMD ["celery", "-A", "outputs_processor", "worker", "--loglevel=info", "--concurrency=1", "-n", "outputs_processor@%h"] \ No newline at end of file diff --git a/distributed/dockerfiles/Dockerfile.tasks b/distributed/dockerfiles/Dockerfile.tasks deleted file mode 100755 index d89fb713..00000000 --- a/distributed/dockerfiles/Dockerfile.tasks +++ /dev/null @@ -1,51 +0,0 @@ -ARG TAG -FROM celerybase - -# install packages here -# install packages necessary for celery and creating screenshots -RUN pip install -r requirements.txt -RUN conda install -c conda-forge lz4 -RUN conda install -c conda-forge jinja2 pyppeteer && pyppeteer-install - -ARG TITLE -ARG OWNER -ARG REPO_URL -ARG RAW_REPO_URL -ARG BRANCH=master - -# Install necessary packages, copying files, etc. -###################### -# Bump to trigger build -ARG BUILD_NUM=0 - -ADD ${RAW_REPO_URL}/${BRANCH}/cs-config/install.sh /home -RUN cat /home/install.sh -RUN bash /home/install.sh - -# Bump to trigger re-install of source, without re-installing dependencies. -ARG INSTALL_NUM=0 -RUN pip install "git+${REPO_URL}.git@${BRANCH}#egg=cs-config&subdirectory=cs-config" -ADD ${RAW_REPO_URL}/${BRANCH}/cs-config/cs_config/tests/test_functions.py /home -RUN pip install cs-kit -RUN py.test /home/test_functions.py -v -s -###################### - -ARG SIM_TIME_LIMIT -COPY templates/tasks_template.py tasks_template.py -COPY tasks_writer.py tasks_writer.py -RUN mkdir /home/distributed/api/celery_app -RUN python tasks_writer.py --owner ${OWNER} --title ${TITLE} --sim-time-limit ${SIM_TIME_LIMIT} --out /home/distributed/api/celery_app - -# copy over necessary files for this project's celery app -COPY ./api/__init__.py /home/distributed/api/__init__.py -COPY ./api/celery_app/__init__.py /home/distributed/api/celery_app/__init__.py -COPY ./setup.py /home/distributed -RUN cd /home/distributed && pip install -e . - -WORKDIR /home/distributed/api - -COPY celery_sim.sh /home/distributed/api/celery_sim.sh -COPY celery_io.sh /home/distributed/api/celery_io.sh - -COPY ./cs-dask-sim /home/distributed/cs-dask-sim -RUN cd /home/distributed/cs-dask-sim && pip install -e . diff --git a/distributed/kubernetes/flower-deployment.yaml b/distributed/kubernetes/flower-deployment.yaml deleted file mode 100644 index 9ea42beb..00000000 --- a/distributed/kubernetes/flower-deployment.yaml +++ /dev/null @@ -1,40 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: flower-monitor -spec: - replicas: 1 - selector: - matchLabels: - app: flower-monitor - template: - metadata: - labels: - app: flower-monitor - spec: - containers: - - name: flower-monitor - image: mher/flower - imagePullPolicy: Always - command: ["flower"] - args: ["--broker=redis://redis-master/0", "--port=8888"] - env: - - name: GET_HOSTS_FROM - value: dns - - name: FLOWER_PORT - value: "8888" - ports: - - containerPort: 8888 - resources: - requests: - memory: "95Mi" - cpu: "75m" - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: size - operator: In - values: - - small diff --git a/distributed/outputs_processor.py b/distributed/outputs_processor.py new file mode 100644 index 00000000..d0c64884 --- /dev/null +++ b/distributed/outputs_processor.py @@ -0,0 +1,57 @@ +import os + +import requests +from celery import Celery + +import cs_storage + + +CS_URL = os.environ.get("CS_URL") +CS_API_TOKEN = os.environ.get("CS_API_TOKEN") + +CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") +CELERY_RESULT_BACKEND = os.environ.get( + "CELERY_RESULT_BACKEND", "redis://localhost:6379" +) + +app = Celery( + "outputs_processor", broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND +) +app.conf.update( + task_serializer="json", + accept_content=["json"], + worker_prefetch_multiplier=1, + task_acks_late=True, +) + + +@app.task(name="outputs_processor.write_to_storage") +def write(task_id, outputs): + outputs = cs_storage.deserialize_from_json(outputs) + res = cs_storage.write(task_id, outputs) + print(res) + return res + + +@app.task(name="outputs_processor.push_to_cs") +def push(task_type, payload): + if task_type == "sim": + print(f"posting data to {CS_URL}/outputs/api/") + resp = requests.put( + f"{CS_URL}/outputs/api/", + json=payload, + headers={"Authorization": f"Token {CS_API_TOKEN}"}, + ) + print("resp", resp.status_code) + if resp.status_code == 400: + print("errors", resp.json()) + if task_type == "parse": + print(f"posting data to {CS_URL}/inputs/api/") + resp = requests.put( + f"{CS_URL}/inputs/api/", + json=payload, + headers={"Authorization": f"Token {CS_API_TOKEN}"}, + ) + print("resp", resp.status_code) + if resp.status_code == 400: + print("errors", resp.json()) diff --git a/distributed/requirements.txt b/distributed/requirements.txt index dc486c9d..aa722c64 100755 --- a/distributed/requirements.txt +++ b/distributed/requirements.txt @@ -7,4 +7,6 @@ flask toolz gunicorn boto3 -cs-storage>=1.8.1 \ No newline at end of file +pyyaml +cs-storage>=1.10.1 +git+https://github.com/compute-tooling/compute-studio-publish.git \ No newline at end of file diff --git a/distributed/tasks_writer.py b/distributed/tasks_writer.py deleted file mode 100644 index 914a8bcc..00000000 --- a/distributed/tasks_writer.py +++ /dev/null @@ -1,48 +0,0 @@ -import argparse -import yaml -import re -import os - -from jinja2 import Template - - -def clean(word): - return re.sub("[^0-9a-zA-Z]+", "", word).lower() - - -def template(owner, title, sim_time_limit, out): - owner = clean(owner) - title = clean(title) - print(owner, title) - with open("tasks_template.py") as f: - text = f.read() - - t = Template(text) - - r = t.render(APP_NAME=f"{owner}_{title}_tasks", SIM_TIME_LIMIT=sim_time_limit) - - with open(os.path.join(out, f"{owner}_{title}_tasks.py"), "w") as f: - f.write(r) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Write tasks modules from template.") - parser.add_argument("--owner") - parser.add_argument("--title") - parser.add_argument("--sim-time-limit", dest="sim_time_limit", type=int) - parser.add_argument("--config") - parser.add_argument("--out", "-o", default="api/celery_app") - parser.add_argument("--models", nargs="+", type=str, required=False, default=None) - args = parser.parse_args() - models = args.models if args.models and args.models[0] else None - if args.config: - with open(args.config) as f: - config = yaml.safe_load(f.read()) - for obj in config: - if models and obj["title"] not in models: - continue - template(obj["owner"], obj["title"], obj["sim_time_limit"], args.out) - elif args.owner and args.title and args.sim_time_limit: - template(args.owner, args.title, args.sim_time_limit, args.out) - else: - print("No arguments received.") diff --git a/distributed/templates/dask/scheduler-deployment.template.yaml b/distributed/templates/dask/scheduler-deployment.template.yaml deleted file mode 100644 index 91903ac9..00000000 --- a/distributed/templates/dask/scheduler-deployment.template.yaml +++ /dev/null @@ -1,41 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: # [owner]-[title]-dask-scheduler - labels: - app: # [owner]-[title]-dask-scheduler - component: scheduler -spec: - replicas: 1 - selector: - matchLabels: - app: # [owner]-[title]-dask-scheduler - component: scheduler - strategy: - type: RollingUpdate - template: - metadata: - labels: - app: # [owner]-[title]-dask-scheduler - component: scheduler - spec: - containers: - - name: # [owner]-[title]-dask-scheduler - image: daskdev/dask:latest - imagePullPolicy: IfNotPresent - args: - - dask-scheduler - - --port - - "8786" - - --bokeh-port - - "8787" - ports: - - containerPort: 8786 - - containerPort: 8787 - resources: - requests: - cpu: 0.5 - memory: 2G - limits: - cpu: 1.8 - memory: 6G diff --git a/distributed/templates/dask/scheduler-service.template.yaml b/distributed/templates/dask/scheduler-service.template.yaml deleted file mode 100644 index ca6e92a5..00000000 --- a/distributed/templates/dask/scheduler-service.template.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: # [owner]-[title]-dask-scheduler - labels: - app: # [owner]-[title]-dask-scheduler - component: scheduler -spec: - ports: - - name: # [owner]-[title]-dask-scheduler - port: 8786 - targetPort: 8786 - - name: # [owner]-[title]-dask-webui - port: 8787 - targetPort: 8787 - selector: - app: # [owner]-[title]-dask-scheduler - component: scheduler diff --git a/distributed/templates/dask/worker-deployment.template.yaml b/distributed/templates/dask/worker-deployment.template.yaml deleted file mode 100644 index 91349d38..00000000 --- a/distributed/templates/dask/worker-deployment.template.yaml +++ /dev/null @@ -1,45 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: # [owner]-[title]-dask-worker - labels: - app: # [owner]-[title]-dask-worker - component: worker -spec: - replicas: 1 - selector: - matchLabels: - app: # [owner]-[title]-dask-worker - component: worker - strategy: - type: RollingUpdate - template: - metadata: - labels: - app: # [owner]-[title]-worker - component: worker - spec: - containers: - - name: # [owner]-[tutle]-dask-worker - image: # gcr.io/[project]/[owner]_[title]_tasks:[tag] - imagePullPolicy: Always - args: - - dask-worker - - # [owner]-[title]-dask-scheduler:8786 - - --nthreads - - # cpus - - --memory-limit - - # memory limit - - --no-bokeh - ports: - - containerPort: 8789 - resources: - env: - - name: DASK_DISTRIBUTED__DAEMON - value: "false" - - name: BUCKET - valueFrom: - secretKeyRef: - name: worker-secret - key: BUCKET - # affinity: diff --git a/distributed/templates/flask-deployment.template.yaml b/distributed/templates/flask-deployment.template.yaml index de0be4ef..7e113ff1 100755 --- a/distributed/templates/flask-deployment.template.yaml +++ b/distributed/templates/flask-deployment.template.yaml @@ -22,27 +22,15 @@ spec: ports: - containerPort: 5050 env: - - name: COMP_URL + - name: CS_URL valueFrom: secretKeyRef: name: worker-secret - key: COMP_URL - - name: COMP_API_TOKEN + key: CS_URL + - name: CS_API_TOKEN valueFrom: secretKeyRef: name: worker-secret - key: COMP_API_TOKEN - - name: BUCKET - valueFrom: - secretKeyRef: - name: worker-secret - key: BUCKET - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: size - operator: In - values: - - small + key: CS_API_TOKEN + nodeSelector: + component: api \ No newline at end of file diff --git a/distributed/templates/outputs-processor-deployment.template.yaml b/distributed/templates/outputs-processor-deployment.template.yaml new file mode 100755 index 00000000..baa6de6c --- /dev/null +++ b/distributed/templates/outputs-processor-deployment.template.yaml @@ -0,0 +1,39 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: outputs-processor +spec: + replicas: 1 + selector: + matchLabels: + app: outputs-processor + template: + metadata: + labels: + app: outputs-processor + spec: + containers: + - name: outputs-processor + image: + imagePullPolicy: Always + env: + - name: GET_HOSTS_FROM + value: dns + env: + - name: BUCKET + valueFrom: + secretKeyRef: + name: worker-secret + key: BUCKET + - name: CS_URL + valueFrom: + secretKeyRef: + name: worker-secret + key: CS_URL + - name: CS_API_TOKEN + valueFrom: + secretKeyRef: + name: worker-secret + key: CS_API_TOKEN + nodeSelector: + component: api \ No newline at end of file diff --git a/distributed/templates/sc-deployment.template.yaml b/distributed/templates/sc-deployment.template.yaml deleted file mode 100755 index 3ce428a4..00000000 --- a/distributed/templates/sc-deployment.template.yaml +++ /dev/null @@ -1,45 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: # [owner]-[title]-[action] -spec: - replicas: 1 - selector: - matchLabels: - app: # [owner]-[title]-[action] - template: - metadata: - labels: - app: # [owner]-[title]-[action] - spec: - containers: - - name: # [owner]-[title]-[action] - image: # gcr.io/[project]/[owner]_[title]_tasks:[tag] - imagePullPolicy: Always - command: [] # ["./celery_[action].sh"] - args: [] # ["[owner]", "[title"] - resources: - requests: - memory: - cpu: - limits: - memory: - cpu: - env: - - name: OUTPUTS_VERSION - value: v1 - - name: COMP_URL - valueFrom: - secretKeyRef: - name: worker-secret - key: COMP_URL - - name: COMP_API_TOKEN - valueFrom: - secretKeyRef: - name: worker-secret - key: COMP_API_TOKEN - - name: BUCKET - valueFrom: - secretKeyRef: - name: worker-secret - key: BUCKET diff --git a/distributed/templates/secret.template.yaml b/distributed/templates/secret.template.yaml new file mode 100644 index 00000000..84d058f4 --- /dev/null +++ b/distributed/templates/secret.template.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Secret +metadata: + name: worker-secret +type: Opaque +stringData: + CS_URL: https://dev.compute.studio + BUCKET: cs-outputs-dev + CS_API_TOKEN: + + OUTPUTS_VERSION: "v1" diff --git a/distributed/templates/tasks_template.py b/distributed/templates/tasks_template.py deleted file mode 100644 index 66cc7fa1..00000000 --- a/distributed/templates/tasks_template.py +++ /dev/null @@ -1,58 +0,0 @@ -import time -import os - -from api.celery_app import celery_app, task_wrapper - -try: - from cs_config import functions -except ImportError as ie: - if os.environ.get("IS_FLASK", "False") == "True": - functions = None - else: - raise ie - - -@celery_app.task( - name="{{APP_NAME}}.inputs_version", soft_time_limit=10, bind=True, acks_late=True -) -@task_wrapper -def inputs_version(self): - return {"version": functions.get_version()} - - -@celery_app.task( - name="{{APP_NAME}}.inputs_get", soft_time_limit=10, bind=True, acks_late=True -) -@task_wrapper -def inputs_get(self, meta_param_dict): - return functions.get_inputs(meta_param_dict) - - -@celery_app.task( - name="{{APP_NAME}}.inputs_parse", soft_time_limit=10, bind=True, acks_late=True -) -@task_wrapper -def inputs_parse(self, meta_param_dict, adjustment, errors_warnings): - return functions.validate_inputs(meta_param_dict, adjustment, errors_warnings) - - -@celery_app.task( - name="{{APP_NAME}}.sim", - soft_time_limit={{SIM_TIME_LIMIT}}, - bind=True, - acks_late=True, -) -@task_wrapper -def sim(self, meta_param_dict, adjustment): - if os.environ.get("DASK_SCHEDULER_ADDRESS") is not None: - from distributed import Client - from dask import delayed - - print("submitting data") - with Client() as c: - print("c", c) - fut = c.submit(functions.run_model, meta_param_dict, adjustment) - print("waiting on result", fut) - return fut.result() - - return functions.run_model(meta_param_dict, adjustment) diff --git a/distributed/worker_config.dev.yaml b/distributed/worker_config.dev.yaml deleted file mode 100644 index ee0d5f97..00000000 --- a/distributed/worker_config.dev.yaml +++ /dev/null @@ -1,37 +0,0 @@ -- owner: hdoupe - title: Matchups - branch: master - repo_url: https://github.com/hdoupe/Matchups - cluster_type: single-core - env: - BUILD_NUM: 0 - affinity: - size: small - secrets: - some_secret: hello world - resources: - limits: - cpu: 1000m - memory: 4000Mi - requests: - cpu: 500m - memory: 300Mi - sim_time_limit: 60 -- owner: PSLmodels - title: OG-USA - sim_time_limit: 10000 - repo_url: https://github.com/PSLmodels/OG-USA - branch: master - cluster_type: dask - replicas: 2 - resources: - requests: - memory: 7G - cpu: 2 - limits: - memory: 7G - cpu: 2 - env: - {} - # BUILD_NUM: 2 - # INSTALL_NUM: 0 From bd232ffe569525baa2409985080084f4e421c69d Mon Sep 17 00:00:00 2001 From: hdoupe Date: Wed, 6 May 2020 10:51:57 -0400 Subject: [PATCH 02/55] Fix how model resources are specified and expose config for git tag --- src/Publish/index.tsx | 51 +++++++++++++++++-- webapp/apps/billing/tests/test_models.py | 1 - .../apps/comp/tests/test_model_parameters.py | 1 - webapp/apps/conftest.py | 1 - webapp/apps/publish/serializers.py | 8 ++- webapp/apps/publish/tests/test_views.py | 12 +++-- .../migrations/0011_auto_20200506_0928.py | 26 ++++++++++ .../users/migrations/0012_project_repo_tag.py | 16 ++++++ webapp/apps/users/models.py | 7 +-- 9 files changed, 108 insertions(+), 15 deletions(-) create mode 100755 webapp/apps/users/migrations/0011_auto_20200506_0928.py create mode 100755 webapp/apps/users/migrations/0012_project_repo_tag.py diff --git a/src/Publish/index.tsx b/src/Publish/index.tsx index e1ab5710..b9f91e3d 100755 --- a/src/Publish/index.tsx +++ b/src/Publish/index.tsx @@ -48,7 +48,9 @@ interface PublishValues { description: string; oneliner: string; repo_url: string; - server_size: [number, number]; + repo_tag: string; + cpu: number; + memory: number; exp_task_time: number; listed: boolean; } @@ -58,7 +60,9 @@ const initialValues: PublishValues = { description: "", oneliner: "", repo_url: "", - server_size: [4, 2], + repo_tag: "master", + cpu: 2, + memory: 6, exp_task_time: 0, listed: true }; @@ -214,6 +218,21 @@ class PublishForm extends React.Component { /> } />
+ + } /> +
+ +