Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Improvement on the Airtable Interface(#1299) #1343

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ersilia/cli/commands/fetch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import click
import asyncio
from . import ersilia_cli
from .. import echo
from ...hub.fetch.fetch import ModelFetcher
Expand All @@ -9,7 +10,7 @@ def fetch_cmd():
"""Create fetch commmand"""

def _fetch(mf, model_id):
mf.fetch(model_id)
asyncio.run(mf.fetch(model_id))

# Example usage: ersilia fetch {MODEL}
@ersilia_cli.command(
Expand Down Expand Up @@ -116,4 +117,4 @@ def fetch(
local_dir=from_dir,
)
_fetch(mf, model_id)
echo(":thumbs_up: Model {0} fetched successfully!".format(model_id), fg="green")
echo(":thumbs_up: Model {0} fetched successfully!".format(model_id), fg="green")
1 change: 0 additions & 1 deletion ersilia/db/hubdata/json_models_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
class JsonModelsInterface(ErsiliaBase):
def __init__(self, config_json):
ErsiliaBase.__init__(self, config_json=config_json)
# self.cache_dir =
self.json_file_name = MODELS_JSON
self.url = f"https://{ERSILIA_MODEL_HUB_S3_BUCKET}.s3.eu-central-1.amazonaws.com/{MODELS_JSON}"

Expand Down
17 changes: 8 additions & 9 deletions ersilia/hub/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ def _find_url_using_s3_models_json(self, model_id):
def _find_url_using_airtable(self, model_id):
url_field = HOST_URL
identifier_field = IDENTIFIER
for record in self.ai.items_all():
fields = record["fields"]
if fields[identifier_field] == model_id:
if url_field in fields:
return fields[url_field]
else:
self.logger.debug("No hosted URL found for this model in AirTable")
return
self.logger.debug("Model was not found in AirTable")

records_cache = {record["fields"][identifier_field]: record["fields"] for record in self.aio.items_all()}

fields = records_cache.get(model_id)
if fields:
return fields.get(url_field, None)
self.logger.debug("Model was not found in AirTable")
return None

def resolve_valid_hosted_model_url(self, model_id):
"""Resolves the URL of a hosted model

Expand Down
20 changes: 11 additions & 9 deletions ersilia/hub/fetch/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ def _fetch_not_from_dockerhub(self, model_id):
else:
self.logger.debug("Model already exists in your local, skipping fetching")

def _fetch_from_dockerhub(self, model_id):

async def _fetch_from_dockerhub(self, model_id):
self.logger.debug("Fetching from DockerHub")
self.model_dockerhub_fetcher.fetch(model_id=model_id)
await self.model_dockerhub_fetcher.fetch(model_id=model_id)

def _fetch_from_hosted(self, model_id):
self.logger.debug("Fetching from hosted")
Expand Down Expand Up @@ -213,8 +214,8 @@ def exists(self, model_id):
return True
else:
return False
def _fetch(self, model_id):

async def _fetch(self, model_id):

self.logger.debug("Starting fetching procedure")
do_dockerhub = self._decide_if_use_dockerhub(model_id=model_id)
Expand All @@ -233,10 +234,11 @@ def _fetch(self, model_id):
self.logger.debug("Fetching in your system, not from DockerHub")
self._fetch_not_from_dockerhub(model_id=model_id)

def fetch(self, model_id):
self._fetch(model_id)
async def fetch(self, model_id):
await self._fetch(model_id)
self.logger.debug("Writing model source to file")
model_source_file = os.path.join(self._model_path(model_id), MODEL_SOURCE_FILE)
path = self._model_path(model_id)
os.makedirs(path, exist_ok=True)
model_source_file = os.path.join(path, MODEL_SOURCE_FILE)
with open(model_source_file, "w") as f:
f.write(self.model_source)

f.write(self.model_source)
71 changes: 35 additions & 36 deletions ersilia/hub/fetch/lazy_fetchers/dockerhub.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import json
import asyncio
from ..register.register import ModelRegisterer

from .... import ErsiliaBase, throw_ersilia_exception
Expand All @@ -19,10 +20,9 @@
from ....utils.exceptions_utils.fetch_exceptions import DockerNotActiveError
from .. import STATUS_FILE


class ModelDockerHubFetcher(ErsiliaBase):
def __init__(self, overwrite=None, config_json=None):
ErsiliaBase.__init__(self, config_json=config_json, credentials_json=None)
super().__init__(config_json=config_json, credentials_json=None)
self.simple_docker = SimpleDocker()
self.overwrite = overwrite

Expand All @@ -42,28 +42,28 @@ def is_available(self, model_id):
return True
return False

def write_apis(self, model_id):
async def write_apis(self, model_id):
self.logger.debug("Writing APIs")
di = PulledDockerImageService(
model_id=model_id, config_json=self.config_json, preferred_port=None
)
di.serve()
di.close()

def _copy_from_bentoml_image(self, model_id, file):
fr_file = "/root/eos/dest/{0}/{1}".format(model_id, file)
to_file = "{0}/dest/{1}/{2}".format(EOS, model_id, file)
self.simple_docker.cp_from_image(
async def _copy_from_bentoml_image(self, model_id, file):
fr_file = f"/root/eos/dest/{model_id}/{file}"
to_file = f"{EOS}/dest/{model_id}/{file}"
await self.simple_docker.cp_from_image(
img_path=fr_file,
local_path=to_file,
org=DOCKERHUB_ORG,
img=model_id,
tag=DOCKERHUB_LATEST_TAG,
)

def _copy_from_ersiliapack_image(self, model_id, file):
fr_file = "/root/{0}".format(file)
to_file = "{0}/dest/{1}/{2}".format(EOS, model_id, file)
async def _copy_from_ersiliapack_image(self, model_id, file):
fr_file = f"/root/{file}"
to_file = f"{EOS}/dest/{model_id}/{file}"
self.simple_docker.cp_from_image(
img_path=fr_file,
local_path=to_file,
Expand All @@ -72,30 +72,30 @@ def _copy_from_ersiliapack_image(self, model_id, file):
tag=DOCKERHUB_LATEST_TAG,
)

def _copy_from_image_to_local(self, model_id, file):
async def _copy_from_image_to_local(self, model_id, file):
pack_method = resolve_pack_method_docker(model_id)
if pack_method == PACK_METHOD_BENTOML:
self._copy_from_bentoml_image(model_id, file)
else:
self._copy_from_ersiliapack_image(model_id, file)

def copy_information(self, model_id):
async def copy_information(self, model_id):
self.logger.debug("Copying information file from model container")
self._copy_from_image_to_local(model_id, INFORMATION_FILE)

def copy_metadata(self, model_id):
async def copy_metadata(self, model_id):
self.logger.debug("Copying api_schema_file file from model container")
self._copy_from_image_to_local(model_id, API_SCHEMA_FILE)

def copy_status(self, model_id):
async def copy_status(self, model_id):
self.logger.debug("Copying status file from model container")
self._copy_from_image_to_local(model_id, STATUS_FILE)
def copy_example_if_available(self, model_id):
# TODO This also needs to change to accomodate ersilia pack

async def copy_example_if_available(self, model_id):
# This needs to accommodate ersilia pack
for pf in PREDEFINED_EXAMPLE_FILES:
fr_file = "/root/eos/dest/{0}/{1}".format(model_id, pf)
to_file = "{0}/dest/{1}/{2}".format(EOS, model_id, "input.csv")
fr_file = f"/root/eos/dest/{model_id}/{pf}"
to_file = f"{EOS}/dest/{model_id}/input.csv"
try:
self.simple_docker.cp_from_image(
img_path=fr_file,
Expand All @@ -108,12 +108,9 @@ def copy_example_if_available(self, model_id):
except:
self.logger.debug("Could not find example file in docker image")

def modify_information(self, model_id):
async def modify_information(self, model_id):
"""
Modify the information file being copied from docker container to the host machine.
:param file: The model information file being copied.
:param service_class_file: File containing the model service class.
:size_file: File containing the size of the pulled docker image.
"""
information_file = os.path.join(self._model_path(model_id), INFORMATION_FILE)
mp = ModelPuller(model_id=model_id, config_json=self.config_json)
Expand All @@ -124,24 +121,26 @@ def modify_information(self, model_id):
self.logger.error("Information file not found, not modifying anything")
return None

# Using this literal here to prevent a file read
# from service class file for a model fetched through DockerHub
# since we already know the service class.
data["service_class"] = "pulled_docker"
data["size"] = mp._get_size_of_local_docker_image_in_mb() # TODO this should probably be a util function
data["size"] = mp._get_size_of_local_docker_image_in_mb()

with open(information_file, "w") as outfile:
json.dump(data, outfile, indent=4)

@throw_ersilia_exception
def fetch(self, model_id):
async def fetch(self, model_id):
mp = ModelPuller(model_id=model_id, config_json=self.config_json)
self.logger.debug("Pulling model image from DockerHub")
mp.pull()
# Asynchronous pulling
await mp.async_pull()
mr = ModelRegisterer(model_id=model_id, config_json=self.config_json)
mr.register(is_from_dockerhub=True)
self.write_apis(model_id)
self.copy_information(model_id)
self.modify_information(model_id)
self.copy_metadata(model_id)
self.copy_status(model_id)
self.copy_example_if_available(model_id)
# Asynchronous and concurent execution
await asyncio.gather(
mr.register(is_from_dockerhub=True),
self.write_apis(model_id),
self.copy_information(model_id),
self.modify_information(model_id),
self.copy_metadata(model_id),
self.copy_status(model_id),
self.copy_example_if_available(model_id)
)
61 changes: 47 additions & 14 deletions ersilia/hub/pull/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import json
import os
import re

import asyncio
import aiofiles
from ... import ErsiliaBase
from ...utils.terminal import yes_no_input, run_command
from ... import throw_ersilia_exception
Expand Down Expand Up @@ -85,7 +86,7 @@ def _get_size_of_local_docker_image_in_mb(self):
return None

@throw_ersilia_exception
def pull(self):
async def pull(self):
if self.is_available_locally():
if self.overwrite is None:
do_pull = yes_no_input(
Expand Down Expand Up @@ -116,29 +117,61 @@ def pull(self):
make_temp_dir(prefix="ersilia-"), "docker_pull.log"
)
self.logger.debug("Keeping logs of pull in {0}".format(tmp_file))
run_command(
"docker pull {0}/{1}:{2} > {3} 2>&1".format(
DOCKERHUB_ORG, self.model_id, DOCKERHUB_LATEST_TAG, tmp_file
)

# Construct the pull command
pull_command = f"docker pull {DOCKERHUB_ORG}/{self.model_id}:{DOCKERHUB_LATEST_TAG} > {tmp_file} 2>&1"

# Use asyncio to run the pull command asynchronously
process = await asyncio.create_subprocess_shell(
pull_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
with open(tmp_file, "r") as f:
pull_log = f.read()

# Wait for the command to complete
stdout, stderr = await process.communicate()

# Handle output
if process.returncode != 0:
self.logger.error(f"Pull command failed: {stderr.decode()}")
raise subprocess.CalledProcessError(process.returncode, pull_command)

self.logger.debug(stdout.decode())

# Reading log asynchronously
async with aiofiles.open(tmp_file, 'r') as f:
pull_log = await f.read()
self.logger.debug(pull_log)

if re.search(r"no match.*manifest", pull_log):
self.logger.warning(
"No matching manifest for image {0}".format(self.model_id)
)
raise DockerConventionalPullError(model=self.model_id)
self.logger.debug("Image pulled succesfully!")

self.logger.debug("Image pulled successfully!")

except DockerConventionalPullError:
self.logger.warning(
"Conventional pull did not work, Ersilia is now forcing linux/amd64 architecture"
)
run_command(
"docker pull {0}/{1}:{2} --platform linux/amd64".format(
DOCKERHUB_ORG, self.model_id, DOCKERHUB_LATEST_TAG
)
# Force platform specification pull command
force_pull_command = f"docker pull {DOCKERHUB_ORG}/{self.model_id}:{DOCKERHUB_LATEST_TAG} --platform linux/amd64"

# Run forced pull asynchronously
process = await asyncio.create_subprocess_shell(
force_pull_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await process.communicate()

if process.returncode != 0:
self.logger.error(f"Forced pull command failed: {stderr.decode()}")
raise subprocess.CalledProcessError(process.returncode, force_pull_command)

self.logger.debug(stdout.decode())
size = self._get_size_of_local_docker_image_in_mb()
if size:
self.logger.debug("Size of image {0} MB".format(size))
Expand All @@ -151,4 +184,4 @@ def pull(self):
return size
else:
self.logger.info("Image {0} is not available".format(self.image_name))
raise DockerImageNotAvailableError(model=self.model_id)
raise DockerImageNotAvailableError(model=self.model_id)
Loading