From a990909a396acfe2a72f3a0bc01768a899896ff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-Olivier=20Gu=C3=A9rin?= Date: Wed, 25 Apr 2018 14:12:52 -0400 Subject: [PATCH 1/6] Make error message more relevant (#19) --- server/resources/path.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/resources/path.py b/server/resources/path.py index 31cb6a8..6925979 100644 --- a/server/resources/path.py +++ b/server/resources/path.py @@ -7,7 +7,8 @@ from server.common.error_codes_and_messages import ( ErrorCodeAndMessageFormatter, UNAUTHORIZED, INVALID_PATH, INVALID_ACTION, MD5_ON_DIR, LIST_ACTION_ON_FILE, ACTION_REQUIRED, UNEXPECTED_ERROR, - PATH_DOES_NOT_EXIST, PATH_IS_DIRECTORY, INVALID_REQUEST) + PATH_DOES_NOT_EXIST, PATH_IS_DIRECTORY, INVALID_REQUEST, + PATH_DOES_NOT_EXIST) from .models.upload_data import UploadDataSchema from .models.boolean_response import BooleanResponse from .models.path import Path as PathModel @@ -114,7 +115,7 @@ def delete(self, user, complete_path: str = ''): try: os.remove(requested_data_path) except FileNotFoundError: - return marshal(INVALID_PATH), 400 + return marshal(PATH_DOES_NOT_EXIST), 400 except OSError: return marshal(UNEXPECTED_ERROR), 500 return Response(status=204) From 51e5aa1dd07e727048db2e6711198e5defc4fe02 Mon Sep 17 00:00:00 2001 From: simon-dube Date: Wed, 25 Apr 2018 14:54:43 -0400 Subject: [PATCH 2/6] Get stdout stderr (#17) * Execution Play first changes Include ExecutionProcess DB models and misc. fixes * Execution Play part 1 and ErrorCodeAndMessage small rework * Fix test_put_with_invalid_upload_type * Properly placing the folder deletion * Fixing input parameter validation * Execution play continued * Further progress in Execution Play We now load all Boutiques Pipeline and transform them to valid Carmin pipeline at server launch. TODO: Endpoint /pipelines/reload accessible to admin only which would reload the boutiques pipelines * dockerignore rename * Fixing functions, dockerignore and path imports * Fixing execution We now have the proper paths for th descriptor and the inputs. We can launch an execution on the current thread. but for some reason the mount does not appear to work. In fact, I try mounting the execution folder on the execution root, but this does not appear to work. More investigation will have to be done regarding this. * Fixing error message * Execution taking place Still need to fix a few things: - Due to cwd, the mount gives unwanted results when using ../.. * Pipeline and execution fixes * WIP First admin executions folder * Load from id for schema * New admin creation and begin Database refactor * Change error_detial to accept any type * Error details fix * Readme fix- Invalid JSON * Fixing 4XX that are not user errors * Full async execution with execution status to DB * Only initializing executions can be played * Update command * Change output file name from processed to greeting * Change file name * Boutiques descriptor get first pass (missing tests) * Tests written. Still 2 tests to fix. * Get Boutiques descriptor and test fixes done * Stdout and stderr first version (WIP) * First functionnal version of stdout and stderr There is too much code repetition in my opinion. It could be refactored. * stdout test wip * Test progress but still experiencing issues with tests setup * Removing unused import * Removing old code * Putting tests on standby * Commenting out test properly * Refactor stdout and stderr endpoints * Change mimetype to text/plain --- server/resources/execution_stderr.py | 8 ++- server/resources/execution_stdout.py | 8 ++- server/resources/helpers/executions.py | 24 ++++++++- server/resources/helpers/std.py | 27 ++++++++++ server/test/fakedata/executions.py | 11 ++++ .../test/resources/test_execution_stdout.py | 52 +++++++++++++++++++ 6 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 server/resources/helpers/std.py create mode 100644 server/test/resources/test_execution_stdout.py diff --git a/server/resources/execution_stderr.py b/server/resources/execution_stderr.py index 88e9b4d..dbbce59 100644 --- a/server/resources/execution_stderr.py +++ b/server/resources/execution_stderr.py @@ -1,6 +1,10 @@ from flask_restful import Resource +from server.resources.helpers.std import std_file_resource +from server.resources.helpers.executions import STDERR_FILENAME +from server.resources.decorators import login_required class ExecutionStdErr(Resource): - def get(self, execution_identifier): - pass + @login_required + def get(self, user, execution_identifier): + return std_file_resource(user, execution_identifier, STDERR_FILENAME) diff --git a/server/resources/execution_stdout.py b/server/resources/execution_stdout.py index f5a28f7..15cfd3f 100644 --- a/server/resources/execution_stdout.py +++ b/server/resources/execution_stdout.py @@ -1,6 +1,10 @@ from flask_restful import Resource +from server.resources.helpers.std import std_file_resource +from server.resources.helpers.executions import STDOUT_FILENAME +from server.resources.decorators import login_required class ExecutionStdOut(Resource): - def get(self, execution_identifier): - pass + @login_required + def get(self, user, execution_identifier): + return std_file_resource(user, execution_identifier, STDOUT_FILENAME) diff --git a/server/resources/helpers/executions.py b/server/resources/helpers/executions.py index 39c6613..0f701a9 100644 --- a/server/resources/helpers/executions.py +++ b/server/resources/helpers/executions.py @@ -11,7 +11,8 @@ from server.common.error_codes_and_messages import ( UNAUTHORIZED, INVALID_INPUT_FILE, INVALID_PATH, INVALID_MODEL_PROVIDED, INVALID_PIPELINE_IDENTIFIER, EXECUTION_IDENTIFIER_MUST_NOT_BE_SET, - INVALID_QUERY_PARAMETER, UNEXPECTED_ERROR, ErrorCodeAndMessageFormatter) + INVALID_QUERY_PARAMETER, PATH_DOES_NOT_EXIST, UNEXPECTED_ERROR, + ErrorCodeAndMessageFormatter) from server.resources.models.execution import Execution from server.resources.helpers.pipelines import get_pipeline @@ -19,6 +20,9 @@ EXECUTIONS_DIRNAME = "executions" ABSOLUTE_PATH_INPUTS_FILENAME = "inputs_abs.json" +STDOUT_FILENAME = "stdout.txt" +STDERR_FILENAME = "stderr.txt" + def create_user_executions_dir(username: str): user_execution_dir = os.path.join( @@ -199,6 +203,24 @@ def filter_executions(executions, offset, limit): return executions, None +def std_file_path(username: str, execution_identifier: str, + filename: str) -> str: + return os.path.join( + get_user_data_directory(username), EXECUTIONS_DIRNAME, + execution_identifier, filename) + + +def get_std_file(username: str, execution_identifier: str, + filename: str) -> (str, ErrorCodeAndMessage): + file_path = std_file_path(username, execution_identifier, filename) + + try: + with open(file_path) as f: + return f.read(), None + except OSError: + return None, PATH_DOES_NOT_EXIST + + from .path import (create_directory, get_user_data_directory, is_safe_path, is_data_accessible, platform_path_exists, path_from_data_dir) diff --git a/server/resources/helpers/std.py b/server/resources/helpers/std.py new file mode 100644 index 0000000..d86f99d --- /dev/null +++ b/server/resources/helpers/std.py @@ -0,0 +1,27 @@ +from flask import Response +from server.database import db +from server.common.utils import marshal +from server.common.error_codes_and_messages import ( + ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND) +from server.database.queries.executions import get_execution +from server.resources.helpers.executions import (get_execution_as_model, + get_std_file, STDERR_FILENAME) +from server.resources.decorators import login_required + + +def std_file_resource(user, execution_identifier, path_to_file): + execution_db = get_execution(execution_identifier, db.session) + if not execution_db: + error = ErrorCodeAndMessageFormatter(EXECUTION_NOT_FOUND, + execution_identifier) + return marshal(error), 400 + execution, error = get_execution_as_model(user.username, execution_db) + if error: + return marshal(error), 400 + + std, error = get_std_file(user.username, execution_identifier, + path_to_file) + if error: + return marshal(error), 400 + + return Response(std, mimetype='text/plain') diff --git a/server/test/fakedata/executions.py b/server/test/fakedata/executions.py index f563c72..e69e300 100644 --- a/server/test/fakedata/executions.py +++ b/server/test/fakedata/executions.py @@ -1,6 +1,17 @@ from server.resources.models.execution import Execution +from server.database.models.execution import Execution as ExecutionDB, ExecutionStatus from server.test.fakedata.users import standard_user + +def execution_for_db(execution_id: str, username: str) -> ExecutionDB: + return ExecutionDB( + identifier=execution_id, + name="valid_execution", + pipeline_identifier="pipeline1", + status=ExecutionStatus.Finished, + creator_username=username) + + POST_VALID_EXECUTION = Execution( name="valid_execution", pipeline_identifier="pipeline1", diff --git a/server/test/resources/test_execution_stdout.py b/server/test/resources/test_execution_stdout.py new file mode 100644 index 0000000..55c3a80 --- /dev/null +++ b/server/test/resources/test_execution_stdout.py @@ -0,0 +1,52 @@ +import pytest +import copy +import os +import json +from server.test.utils import load_json_data, error_from_response +from server.test.conftest import test_client, session +from server.test.fakedata.users import standard_user +from server import app +from server.config import TestConfig +from server.resources.models.error_code_and_message import ErrorCodeAndMessageSchema +from server.common.error_codes_and_messages import INVALID_PIPELINE_IDENTIFIER +from server.resources.models.pipeline import PipelineSchema +from server.test.fakedata.executions import execution_for_db + +EXECUTION_IDENTIFIER = "test-execution-identifier" + + +@pytest.fixture(autouse=True) +def user_execution_folder(tmpdir_factory): + root_directory = tmpdir_factory.mktemp('data') + subdir = root_directory.mkdir(standard_user().username) + user_executions = subdir.mkdir('executions') + execution_folder = user_executions.mkdir(EXECUTION_IDENTIFIER) + app.config['DATA_DIRECTORY'] = str(root_directory) + + return execution_folder + + +@pytest.fixture(autouse=True) +def data_creation(session): + user = standard_user(encrypted=True) + session.add(user) + session.commit() + + execution = execution_for_db(EXECUTION_IDENTIFIER, user.username) + session.add(execution) + session.commit() + + +class TestExecutionStdOutResource(): + def test_get_execution_std_out_by_identifier(self, test_client, + user_execution_folder): + pass + # simple_stdout_text = "This is stdout content" + # user_execution_folder.join("stdout.txt").write(simple_stdout_text) + + # response = test_client.get( + # '/executions/{}/stdout'.format(EXECUTION_IDENTIFIER), + # headers={ + # "apiKey": standard_user().api_key + # }) + # assert response.data.decode('utf8') == simple_stdout_text From 4405d56534c37f2245500827f6ddd623695248ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-Olivier=20Gu=C3=A9rin?= Date: Wed, 25 Apr 2018 15:34:37 -0400 Subject: [PATCH 3/6] Fix logger crash on 404 (#18) * Fix 500 internal server error when logging 404 * Remove error * Log requests as info --- server/common/error_codes_and_messages.py | 1 + server/logging/config.py | 4 +++- server/logging/setup.py | 26 +++++++++++------------ 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/server/common/error_codes_and_messages.py b/server/common/error_codes_and_messages.py index 5a3bae6..1df88ce 100644 --- a/server/common/error_codes_and_messages.py +++ b/server/common/error_codes_and_messages.py @@ -80,3 +80,4 @@ def errors_as_list() -> List: INVALID_INVOCATION = ErrorCodeAndMessage(130, "Invalid invocation") CANNOT_REPLAY_EXECUTION = ErrorCodeAndMessage( 135, "An execution cannot be replayed. Current status: '{}'") +PAGE_NOT_FOUND = ErrorCodeAndMessage(404, "Page Not Found") diff --git a/server/logging/config.py b/server/logging/config.py index b978d50..07b940a 100644 --- a/server/logging/config.py +++ b/server/logging/config.py @@ -42,11 +42,13 @@ }, 'loggers': { 'request-response': { + 'level': 'INFO', 'propagate': False, 'handlers': ['request-context'] }, 'server-error': { - 'propagata': False, + 'level': 'WARNING', + 'propagate': False, 'handlers': ['unexpected-crash'] } } diff --git a/server/logging/setup.py b/server/logging/setup.py index 5a73289..0c6db5f 100644 --- a/server/logging/setup.py +++ b/server/logging/setup.py @@ -2,16 +2,23 @@ import traceback import logging import logging.config -from flask import g, request, got_request_exception +from flask import g, request, got_request_exception, jsonify from server import app from server.logging.config import LOGGING_CONFIG +from server.common.error_codes_and_messages import ErrorCodeAndMessageMarshaller, PAGE_NOT_FOUND from server.test.utils import error_from_response logging.config.dictConfig(LOGGING_CONFIG) - # Using this instead of @app.errorhandler since flask-restful does not support # the decorator. + + +@app.errorhandler(404) +def page_not_found(e): + return jsonify(ErrorCodeAndMessageMarshaller(PAGE_NOT_FOUND)), 404 + + def log_exception(sender, exception, **extra): logger = logging.getLogger('server-error') logger.error(traceback.format_exc()) @@ -27,26 +34,19 @@ def log_response(response): server_error = re.compile("^5").match(response.status) if user_error or server_error: - error = error_from_response(response) request_content = request.get_json() if request_content and request_content.get('password'): request_content['password'] = '[password]' if user_error: - if error: - msg = '\n User: - IP: {}, username: {}\n Method - {} {}\n Request - {}\n Response - {}: {}'.format( - request.remote_addr, - g.get('username'), request.method, request.path, - request.get_json(), error.error_code, error.error_message) - else: - msg = '\n User: - IP: {}, username: {}\n Method - {} {}\n Request - {}\n Response - {}'.format( - request.remote_addr, g.get('username'), request.method, - request.path, request.get_json(), response.status) + msg = '\n User: - IP: {}, username: {}\n Method - {} {}\n Request - {}\n Response - {}'.format( + request.remote_addr, g.get('username'), request.method, + request.path, request_content, response.get_data()) logger.warning(msg) elif server_error: msg = '\n User: - {}\n Method - {} {}\n Request - {}\n Response - {}'.format( request.remote_addr, request.method, request.path, - request.get_json(), response.get_data()) + request_content, response.get_data()) logger.error(msg) else: msg = '{} - {} - {}'.format(request.remote_addr, request.path, From 09260ade4bed28335257253eef48605a3a071ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-Olivier=20Gu=C3=A9rin?= Date: Mon, 30 Apr 2018 19:07:48 -0400 Subject: [PATCH 4/6] Execution play continue (#21) * Solving async process remaining alive * Add descriptor factory and ABC * Execution successful * Implement boutiques concrete class * Use abstract factory to export pipelines * Return False on error * Timeout management and errors on insert * Use abstract factory for execute * Pipeline timeout and DB better management * Preventive commit * Changes for the demo * Remove print execution kill and add comment to execution post * Get original descriptor also return the type with the path * Fix rename and Added descriptor to DB Execution model * Missing comma * Fixing pipeline test setup * Fix tuple None * Descriptor copied to execution folder * Test modified. Taking into account the Descriptor copy and inputs file * Execution using local descriptor and Abstract descriptor class * Should work entirely for the purge * Changed purged execution status to Unknown * Added forgotten import * Create directories for all supported descriptors * Begin get results and move all carmin files to execution subfolder * Move carmin related files to a subfolder of the execution * Make carmin-files directory hidden * Safety push (VM is getting slow) * Make pipeline identifier path to pipeline * Added execution id to Path * Write absolute paths file to .carmin-files * Added "Corrupted" error message for execution * Removing error line * Two quick fixes * Removed useless pass, added execution delete and more comments * Fix process killing with docker * Make call to sh kill instead of using python kill * Exit code management * Fixed tests executions * Clean up startup validation * Continue test stdout * Add tests for execution play * Std out tests v1 * Fix wrong merge * Make copy of execution * stdout and stderr tests * Fixed ExecutionStatus representation in error messages * Start work on play tests * Before executions tests overhaul * First working version of the tests reworked * Making all execuion that must have the right id functions * Changing execution fakedata functions to snake case * Quick fix to PipelineStub * Continue work on execution play test * Fix testing config app object * Fix 500 error on model validate * Moving invocation validation to POST executions * Two first execution results tests * Test result success progress * Success execution result * Enforce maxLimitListExecutions and simplify get * Add returnedFiles to execution model * Add fix for Content-Length == 3 * Set inputs to error if inconsistent state * Changed get execution results security to allow admin * Return error code and message on invalid pipeline * Execution kill user access * Fixing kill condition for execution * Fixing error code return * Catch error on output_files * Add docker to services --- .travis.yml | 5 + server/__init__.py | 1 + server/common/error_codes_and_messages.py | 23 +- server/config.py | 6 - server/database/app.db | Bin 24576 -> 0 bytes server/database/models/execution.py | 3 + server/database/models/execution_process.py | 7 +- server/database/queries/executions.py | 12 +- server/platform_properties.py | 2 +- server/resources/execution.py | 60 ++++- server/resources/execution_kill.py | 40 +++- server/resources/execution_play.py | 70 +++--- server/resources/execution_results.py | 35 ++- server/resources/executions.py | 90 ++++++-- server/resources/helpers/execution.py | 129 +++-------- server/resources/helpers/execution_kill.py | 50 +++++ server/resources/helpers/execution_play.py | 122 +++++++++++ server/resources/helpers/execution_results.py | 28 +++ server/resources/helpers/executions.py | 154 ++++++++----- server/resources/helpers/pathnames.py | 7 + server/resources/helpers/pipelines.py | 56 +++-- server/resources/helpers/std.py | 13 +- .../resources/models/descriptor/__init__.py | 0 .../resources/models/descriptor/boutiques.py | 39 ++++ .../models/descriptor/descriptor_abstract.py | 35 +++ .../descriptor/supported_descriptors.py | 7 + server/resources/models/execution.py | 6 + server/resources/models/path.py | 6 +- server/resources/path.py | 3 +- server/resources/pipeline.py | 3 +- .../resources/pipeline_boutiquesdescriptor.py | 6 +- server/resources/post_processors.py | 14 ++ server/startup_validation.py | 85 +++++-- server/test/conftest.py | 5 +- server/test/database/app.db | Bin 24576 -> 32768 bytes server/test/fakedata/executions.py | 72 +++--- server/test/fakedata/pipelines.py | 207 +++++++++++++++++- server/test/resources/test_execution.py | 51 +++-- server/test/resources/test_execution_play.py | 94 ++++++++ .../test/resources/test_execution_results.py | 118 ++++++++++ .../test/resources/test_execution_stderr.py | 88 ++++++++ .../test/resources/test_execution_stdout.py | 100 ++++++--- server/test/resources/test_executions.py | 107 ++++----- .../test/resources/test_executions_count.py | 10 +- .../test_pipeline_boutiquesdescriptor.py | 10 +- server/test/resources/test_platform.py | 6 - setup.py | 3 +- 47 files changed, 1561 insertions(+), 427 deletions(-) create mode 100644 server/resources/helpers/execution_kill.py create mode 100644 server/resources/helpers/execution_play.py create mode 100644 server/resources/helpers/execution_results.py create mode 100644 server/resources/helpers/pathnames.py create mode 100644 server/resources/models/descriptor/__init__.py create mode 100644 server/resources/models/descriptor/boutiques.py create mode 100644 server/resources/models/descriptor/descriptor_abstract.py create mode 100644 server/resources/models/descriptor/supported_descriptors.py create mode 100644 server/resources/post_processors.py create mode 100644 server/test/resources/test_execution_play.py create mode 100644 server/test/resources/test_execution_results.py create mode 100644 server/test/resources/test_execution_stderr.py diff --git a/.travis.yml b/.travis.yml index 66309c8..4ec04fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,10 @@ +sudo: required + language: python +services: + - docker + python: - 3.4 - 3.5 diff --git a/server/__init__.py b/server/__init__.py index aeb528e..a16f578 100644 --- a/server/__init__.py +++ b/server/__init__.py @@ -4,6 +4,7 @@ app = create_app() from server.logging.setup import log_response, log_exception +from server.resources.post_processors import * def main(): diff --git a/server/common/error_codes_and_messages.py b/server/common/error_codes_and_messages.py index 1df88ce..3ef70c4 100644 --- a/server/common/error_codes_and_messages.py +++ b/server/common/error_codes_and_messages.py @@ -77,7 +77,28 @@ def errors_as_list() -> List: 120, "Invalid value '{}' for query parameter '{}'.") CANNOT_MODIFY_PARAMETER = ErrorCodeAndMessage( 125, "'{}' cannot be modified on an existing Execution.") -INVALID_INVOCATION = ErrorCodeAndMessage(130, "Invalid invocation") +INVOCATION_INITIALIZATION_FAILED = ErrorCodeAndMessage( + 130, + "The execution was created with identifier '{}', but its initialization failed during the invocation validation." +) CANNOT_REPLAY_EXECUTION = ErrorCodeAndMessage( 135, "An execution cannot be replayed. Current status: '{}'") +INVALID_EXECUTION_TIMEOUT = ErrorCodeAndMessage( + 140, "Invalid execution timeout. Must be between {} and {} seconds.") +CANNOT_KILL_NOT_RUNNING_EXECUTION = ErrorCodeAndMessage( + 145, "Cannot kill a non running execution. Current status: '{}'") +CANNOT_KILL_FINISHING_EXECUTION = ErrorCodeAndMessage( + 150, + "The execution processes are not running, thus the execution is most probably finishing and cannot be killed." +) +CANNOT_GET_RESULT_NOT_COMPLETED_EXECUTION = ErrorCodeAndMessage( + 155, + "The execution is not done yet, thus results cannot be queried. Current status: '{}'. Please try again later." +) +CORRUPTED_EXECUTION = ErrorCodeAndMessage( + 160, + "There is an unrecoverable problem with this execution. Please create a new execution." +) +UNSUPPORTED_DESCRIPTOR_TYPE = ErrorCodeAndMessage( + 165, "The descriptor type '{}' is not supported.") PAGE_NOT_FOUND = ErrorCodeAndMessage(404, "Page Not Found") diff --git a/server/config.py b/server/config.py index a4ef04e..c52357c 100644 --- a/server/config.py +++ b/server/config.py @@ -1,12 +1,6 @@ import os basedir = os.path.abspath(os.path.dirname(__file__)) -SUPPORTED_PROTOCOLS = ["http", "https", "ftp", "sftp", "ftps", "scp", "webdav"] - -SUPPORTED_MODULES = [ - "Processing", "Data", "AdvancedData", "Management", "Commercial" -] - DEFAULT_PROD_DB_URI = os.path.join(basedir, 'database/app.db') SQLITE_DEFAULT_PROD_DB_URI = 'sqlite:///{}'.format(DEFAULT_PROD_DB_URI) diff --git a/server/database/app.db b/server/database/app.db index fd2d575b1667b6c7d517313639811fa23e6ffb22..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 GIT binary patch literal 0 HcmV?d00001 literal 24576 zcmeI4OKclO7=YJFlQyAg$|DK_LH7WP10C4M?9NV6q^7CUMhPi#lcI9Kn%B}LiJjVO zwFNE+gvuQxkhoB!(sHT9p+ZQ!Dvm`Q5qIDM2PA~(k$>Gd-IT^PhvxKqkK+q-Nowr4R$mpV?OS`TLcuy{+m>aGz`Gyb?N<*JcCM-6JvdMmtLBbhtR-t^C8?UqT>xpaYHH2Yo|#`P9VwMFW+N48v(cs&J!gDVmyNWk z&)or=t!nD^TD_ub+S~@Lrxh*WU?|s(sp|jKsk#v<5Sk5B-*VGhjf}3qK~)a7A{uF> zxpsRmFO+AVpDdrWXGg=V$7UN@aolg25dv+9lZB8&C3NLFsvm)haKRkc)IuGY>} zv$bZEm8{)(Yk|)#6_~5wP*FRd?SFn<* zHfI~BSKvUjBARKf^|`V!!z%7ZvH#7quyb@|r0~{kYnEoP?A^0zZr2)dzNgP zA#0-1I8&?ZJAPTOt!&KLVz7B?iM6D%Y|gdU%VM-Ce(hdwk6w$p9S!la|wL zwRNLQ^D_%eCA+xZ6POz7?H_$^WGhqIsueH$yJM{9t_|I+VK3}cc4hmOq3q9I%Nl+! zE8q(apaC?12G9T+Km%w14WI!ufCkV28rUuauNL}8-@JcxqLmw1lgm2x4m3^)_kim- z&J%NomJcP*#5 zE>E93vAB53KfEw8F_>*scKrX(c5NN@g9gw58bAYR01co4G=K)s02)98XaEgtlYw2m zqXVtHz{c_aFWb~Jc7q1c02)98XaEhM0W^RH&;S}h184vZ+!F(j_D&3Jb^gD7{r}*f zotJwuFodE3G=K)s02)98XaEhMfqQJ=*xsR~kq37@nRPSTXGO~6u^aeAxl$48s)z`e zxx|&R!;K82h`iJ^Qn(L(<5p?7jkcaa^tp+-g|@&T2{1zg3gcn9F+4^{!lazNh?jT=Z# z0vUvaMxG#1Hg^II*M&>TjMklka`}NH@PN19a=^e*fr+^!!V%!mgq%nZLTwm>--)0Q zm`=b;R}TX4{Twj%sH;Pz2y;1%FpeN79)(|utF)j1r5@|B$(J6X06d%nnn1WZl7<9~ zYC@^!lSpacE5%I!lb-o;2jHi>&ci<8uN*Lz4oo$ni5D{n3S3|h(+2hx5nNdxLi}T? zO$Xq`k1hl7LJmkHraVtG!a|=B>cue;KGTE+#waJ?VI27#fFFMizZ7dB2NZ5VeW^m? zI4*@KhnnzMO5%i^X+wQ4V4bGT*15p3f*L#;GXQBxSMR!Ff^R#CQGyj8a~ub_6N=h7k=+1tFs## zUh5`Xb00MPu$wFl9F7*&y2-8#K*J;5WKG~HcZJ8h$%gENDE{3|mLq#6{`GFM7k$w1 flWwvS*;Dmj>n7WfJ+^P5n=C^1to~v**@1rnJu#uh diff --git a/server/database/models/execution.py b/server/database/models/execution.py index fefd48a..e4fb89c 100644 --- a/server/database/models/execution.py +++ b/server/database/models/execution.py @@ -21,6 +21,7 @@ class Execution(db.Model): identifier (str): name (str): pipeline_identifier (str): + descriptor (str): timeout (int): status (ExecutionStatus): study_identifier (str): @@ -33,6 +34,7 @@ class Execution(db.Model): identifier (str): name (str): pipeline_identifier (str): + descriptor (str): timeout (int): status (ExecutionStatus): study_identifier (str): @@ -45,6 +47,7 @@ class Execution(db.Model): identifier = Column(String, primary_key=True, default=execution_uuid) name = Column(String, nullable=False) pipeline_identifier = Column(String, nullable=False) + descriptor = Column(String, nullable=False) timeout = Column(Integer) status = Column(Enum(ExecutionStatus), nullable=False) study_identifier = Column(String) diff --git a/server/database/models/execution_process.py b/server/database/models/execution_process.py index 326a1f9..bcdd8cb 100644 --- a/server/database/models/execution_process.py +++ b/server/database/models/execution_process.py @@ -1,5 +1,5 @@ from flask_restful import fields -from sqlalchemy import Column, String, Integer, ForeignKey +from sqlalchemy import Column, String, Integer, Boolean, ForeignKey from server.database import db @@ -9,12 +9,15 @@ class ExecutionProcess(db.Model): Args: execution_identifier (str): pid (int): + is_execution(bool): Attributes: execution_identifier (str): pid (int): + is_execution (bool): """ execution_identifier = Column( String, ForeignKey("execution.identifier"), primary_key=True) - pid = Column(Integer, nullable=False) + pid = Column(Integer, primary_key=True) + is_execution = Column(Boolean, nullable=False) diff --git a/server/database/queries/executions.py b/server/database/queries/executions.py index 7285688..4fd8e4f 100644 --- a/server/database/queries/executions.py +++ b/server/database/queries/executions.py @@ -1,12 +1,14 @@ from typing import List from server.database.models.execution import Execution +from server.database.models.execution_process import ExecutionProcess -def get_all_executions_for_user(username: str, db_session) -> List[Execution]: +def get_all_executions_for_user(username: str, limit: int, offset: int, + db_session) -> List[Execution]: return list( db_session.query(Execution).filter( Execution.creator_username == username).order_by( - Execution.created_at.desc())) + Execution.created_at.desc()).offset(offset).limit(limit)) def get_execution(identifier: str, db_session) -> Execution: @@ -16,3 +18,9 @@ def get_execution(identifier: str, db_session) -> Execution: def get_execution_count_for_user(username: str, db_session) -> int: return db_session.query(Execution).filter( Execution.creator_username == username).count() + + +def get_execution_processes(execution_identifier: str, + db_session) -> List[ExecutionProcess]: + return db_session.query(ExecutionProcess).filter( + ExecutionProcess.execution_identifier == execution_identifier).all() diff --git a/server/platform_properties.py b/server/platform_properties.py index 7a0df9c..a0d1961 100644 --- a/server/platform_properties.py +++ b/server/platform_properties.py @@ -16,7 +16,7 @@ "platformDescription": "A lightweight implementation of the CARMIN 0.3 API Specification", "minAuthorizedExecutionTimeout": - 128, + 2, "maxAuthorizedExecutionTimeout": 0, "defaultExecutionTimeout": diff --git a/server/resources/execution.py b/server/resources/execution.py index cc6d53c..6df3be1 100644 --- a/server/resources/execution.py +++ b/server/resources/execution.py @@ -1,10 +1,15 @@ -from flask_restful import Resource +from flask_restful import Resource, request, inputs from server.database import db -from server.database.queries.executions import get_execution +from server.database.models.execution import ExecutionStatus, current_milli_time +from server.database.queries.executions import get_execution, get_execution_processes from server.common.error_codes_and_messages import ( - ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, CANNOT_MODIFY_PARAMETER) -from server.resources.models.execution import ExecutionSchema -from server.resources.helpers.executions import get_execution_as_model + ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, CANNOT_MODIFY_PARAMETER, + UNAUTHORIZED, CANNOT_KILL_FINISHING_EXECUTION, + CANNOT_KILL_NOT_RUNNING_EXECUTION) +from server.resources.models.execution import ExecutionSchema, EXECUTION_COMPLETED_STATUSES +from server.resources.helpers.executions import ( + get_execution_as_model, get_execution_dir, delete_execution_directory) +from server.resources.helpers.execution_kill import kill_all_execution_processes from server.resources.decorators import (login_required, marshal_response, unmarshal_request) @@ -43,5 +48,46 @@ def put(self, model, execution_identifier, user): db.session.add(execution_db) db.session.commit() - def delete(self, execution_identifier): - pass + @login_required + @marshal_response() + def delete(self, user, execution_identifier): + execution_db = get_execution(execution_identifier, db.session) + if not execution_db: + return ErrorCodeAndMessageFormatter(EXECUTION_NOT_FOUND, + execution_identifier) + if execution_db.creator_username != user.username: + return UNAUTHORIZED + + deleteFiles = request.args.get( + 'deleteFiles', default=False, type=inputs.boolean) + + # Get all the execution running processes + execution_processes = get_execution_processes(execution_identifier, + db.session) + + # Given delete is called to perform only a kill and encounter the same situation as kill, we do not kill the processes + # See execution_kill for more information + if execution_db.status == ExecutionStatus.Running and not execution_processes and not deleteFiles: + return CANNOT_KILL_FINISHING_EXECUTION + + if execution_db.status != ExecutionStatus.Running and not deleteFiles: + return ErrorCodeAndMessageFormatter( + CANNOT_KILL_NOT_RUNNING_EXECUTION, execution_db.status.name) + + # Kill all the execution processed + kill_all_execution_processes(execution_processes) + for execution_process in execution_processes: + db.session.delete(execution_process) + # If the execution is not in a completed status, we mark it as killed + if execution_db.status == ExecutionStatus.Running: + execution_db.status = ExecutionStatus.Killed + execution_db.end_date = current_milli_time() + db.session.commit() + + # Free all resources associated with the execution if delete files is True + if deleteFiles: + execution_dir = get_execution_dir(user.username, + execution_identifier) + delete_execution_directory(execution_dir) + db.session.delete(execution_db) + db.session.commit() diff --git a/server/resources/execution_kill.py b/server/resources/execution_kill.py index 69f1472..a2c1c65 100644 --- a/server/resources/execution_kill.py +++ b/server/resources/execution_kill.py @@ -1,6 +1,42 @@ from flask_restful import Resource +from server.common.error_codes_and_messages import ( + ErrorCodeAndMessageFormatter, ErrorCodeAndMessageAdditionalDetails, + EXECUTION_NOT_FOUND, UNAUTHORIZED, UNEXPECTED_ERROR, + CANNOT_KILL_NOT_RUNNING_EXECUTION, CANNOT_KILL_FINISHING_EXECUTION) +from server.database import db +from server.database.queries.executions import get_execution, get_execution_processes +from server.database.models.execution import Execution, ExecutionStatus, current_milli_time +from server.resources.decorators import login_required, marshal_response +from server.resources.helpers.execution_kill import kill_all_execution_processes class ExecutionKill(Resource): - def put(self, execution_identifier): - pass + @login_required + @marshal_response() + def put(self, user, execution_identifier): + execution_db = get_execution(execution_identifier, db.session) + if not execution_db: + return ErrorCodeAndMessageFormatter(EXECUTION_NOT_FOUND, + execution_identifier) + if user.role != Role.admin and execution_db.creator_username != user.username: + return UNAUTHORIZED + + if execution_db.status != ExecutionStatus.Running: + return ErrorCodeAndMessageFormatter( + CANNOT_KILL_NOT_RUNNING_EXECUTION, execution_db.status.name) + + # Look at its running processes + execution_processes = get_execution_processes(execution_identifier, + db.session) + + if not execution_processes: # Most probably due to the execution being in termination process + return CANNOT_KILL_FINISHING_EXECUTION + + kill_all_execution_processes(execution_processes) + + # Mark the execution as "Killed" and delete the execution processes + execution_db.status = ExecutionStatus.Killed + execution_db.end_date = current_milli_time() + for execution_process in execution_processes: + db.session.delete(execution_process) + db.session.commit() diff --git a/server/resources/execution_play.py b/server/resources/execution_play.py index 495cf35..e9246ac 100644 --- a/server/resources/execution_play.py +++ b/server/resources/execution_play.py @@ -1,5 +1,5 @@ +import os from flask_restful import Resource, request -from boutiques import bosh from jsonschema import ValidationError from server.database import db from server.database.queries.executions import get_execution @@ -7,12 +7,12 @@ from server.database.models.execution import Execution, ExecutionStatus from server.common.error_codes_and_messages import ( ErrorCodeAndMessageFormatter, ErrorCodeAndMessageAdditionalDetails, - EXECUTION_NOT_FOUND, UNAUTHORIZED, UNEXPECTED_ERROR, INVALID_INVOCATION, - CANNOT_REPLAY_EXECUTION) + EXECUTION_NOT_FOUND, UNAUTHORIZED, CORRUPTED_EXECUTION, UNEXPECTED_ERROR, + CANNOT_REPLAY_EXECUTION, UNSUPPORTED_DESCRIPTOR_TYPE) from server.resources.helpers.executions import ( - get_execution_as_model, get_execution_dir, create_absolute_path_inputs) -from server.resources.helpers.execution import start_execution -from server.resources.helpers.pipelines import get_original_descriptor_path + get_execution_as_model, get_descriptor_path, get_absolute_path_inputs_path) +from server.resources.helpers.execution_play import start_execution +from server.resources.models.descriptor.descriptor_abstract import Descriptor class ExecutionPlay(Resource): @@ -28,40 +28,36 @@ def put(self, user, execution_identifier): if execution_db.status != ExecutionStatus.Initializing: return ErrorCodeAndMessageFormatter(CANNOT_REPLAY_EXECUTION, - execution_db.status) + execution_db.status.name) execution, error = get_execution_as_model(user.username, execution_db) if error: + return CORRUPTED_EXECUTION + + # Get the descriptor path + descriptor_path = get_descriptor_path(user.username, + execution.identifier) + + # Get appriopriate descriptor object + descriptor = Descriptor.descriptor_factory_from_type( + execution_db.descriptor) + + if not descriptor: + # We don't have any descriptor defined for this pipeline type + logger = logging.getLogger('server-error') + logger.error( + "Unsupported descriptor type extracted from file at {}".format( + descriptor_path)) + return ErrorCodeAndMessageFormatter(UNSUPPORTED_DESCRIPTOR_TYPE, + execution_db.descriptor) + + modified_inputs_path = get_absolute_path_inputs_path( + user.username, execution.identifier) + if not os.path.isfile(modified_inputs_path): + logger = logging.getLogger('server-error') + logger.error("Absolute path inputs file not found at {}".format( + descriptor_path)) return UNEXPECTED_ERROR - # Get the boutiques descriptor path - boutiques_descriptor_path, error = get_original_descriptor_path( - execution.pipeline_identifier) - if error: - return error - - # Create a version of the inputs file with correct links - modified_inputs_path, error = create_absolute_path_inputs( - user.username, execution.identifier, execution.pipeline_identifier, - request.url_root) - - if error: - return UNEXPECTED_ERROR - - # We are ready to start the execution - # First, let's validate it using invocation - try: - bosh([ - "invocation", boutiques_descriptor_path, "-i", - modified_inputs_path - ]) - except ValidationError as e: - execution_db.status = ExecutionStatus.InitializationFailed - db.session.commit() - return ErrorCodeAndMessageAdditionalDetails( - INVALID_INVOCATION, e.message) - # The execution is valid and we are now ready to start it - start_execution(user, execution, boutiques_descriptor_path, - modified_inputs_path) - pass + start_execution(user, execution, descriptor, modified_inputs_path) diff --git a/server/resources/execution_results.py b/server/resources/execution_results.py index 926810c..7b2d5fc 100644 --- a/server/resources/execution_results.py +++ b/server/resources/execution_results.py @@ -1,6 +1,37 @@ from flask_restful import Resource +from server.database import db +from server.database.queries.executions import get_execution +from server.database.models.execution import Execution as ExecutionDB, ExecutionStatus +from server.database.models.user import User, Role +from server.resources.models.execution import EXECUTION_COMPLETED_STATUSES +from server.resources.decorators import login_required, marshal_response +from server.common.error_codes_and_messages import ( + ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, UNAUTHORIZED, + CANNOT_GET_RESULT_NOT_COMPLETED_EXECUTION) +from server.resources.helpers.execution_results import get_output_files +from server.resources.helpers.executions import is_safe_for_get +from server.resources.models.path import PathSchema class ExecutionResults(Resource): - def get(self, execution_identifier): - pass + @login_required + @marshal_response(PathSchema(many=True)) + def get(self, user, execution_identifier): + execution_db = get_execution(execution_identifier, db.session) + if not execution_db: + return ErrorCodeAndMessageFormatter(EXECUTION_NOT_FOUND, + execution_identifier) + if not is_safe_for_get(user, execution_db): + return UNAUTHORIZED + + if execution_db.status not in EXECUTION_COMPLETED_STATUSES: + return ErrorCodeAndMessageFormatter( + CANNOT_GET_RESULT_NOT_COMPLETED_EXECUTION, + execution_db.status.name) + + # We now know the execution has completed and can retrieve the output files + output_files, error = get_output_files(execution_db.creator_username, + execution_identifier) + if error: + return CORRUPTED_EXECUTION + return output_files diff --git a/server/resources/executions.py b/server/resources/executions.py index 769c264..c32b3ca 100644 --- a/server/resources/executions.py +++ b/server/resources/executions.py @@ -1,36 +1,40 @@ +import logging from flask_restful import Resource, request from sqlalchemy.exc import IntegrityError from server.database import db from server.database.models.execution import Execution, ExecutionStatus -from server.common.error_codes_and_messages import UNEXPECTED_ERROR +from server.platform_properties import PLATFORM_PROPERTIES +from server.common.error_codes_and_messages import UNEXPECTED_ERROR, INVOCATION_INITIALIZATION_FAILED +from server.resources.helpers.pipelines import ( + get_original_descriptor_path_and_type) from server.resources.helpers.executions import ( write_inputs_to_file, create_execution_directory, get_execution_as_model, - validate_request_model, filter_executions, delete_execution_directory) + validate_request_model, delete_execution_directory, + copy_descriptor_to_execution_dir, create_absolute_path_inputs, + query_converter) from server.database.queries.executions import (get_all_executions_for_user, get_execution) from .models.execution import ExecutionSchema from .decorators import unmarshal_request, marshal_response, login_required +from server.resources.models.descriptor.descriptor_abstract import Descriptor class Executions(Resource): @login_required @marshal_response(ExecutionSchema(many=True)) def get(self, user): - offset = request.args.get('offset') - limit = request.args.get('limit') - user_executions = get_all_executions_for_user(user.username, - db.session) + offset = request.args.get('offset', type=query_converter) + limit = request.args.get( + 'limit', type=query_converter) or PLATFORM_PROPERTIES.get( + 'defaultLimitListExecutions') + user_executions = get_all_executions_for_user(user.username, limit, + offset, db.session) for i, execution in enumerate(user_executions): exe, error = get_execution_as_model(user.username, execution) if error: return error user_executions[i] = exe - user_executions, error = filter_executions(user_executions, offset, - limit) - if error: - return error - return user_executions @login_required @@ -42,9 +46,18 @@ def post(self, model, user): return error try: + # Get the descriptor path and type + (descriptor_path, + descriptor_type), error = get_original_descriptor_path_and_type( + model.pipeline_identifier) + if error: + return error + + # Insert new execution to DB new_execution = Execution( name=model.name, pipeline_identifier=model.pipeline_identifier, + descriptor=descriptor_type, timeout=model.timeout, status=ExecutionStatus.Initializing, study_identifier=model.study_identifier, @@ -52,20 +65,69 @@ def post(self, model, user): db.session.add(new_execution) db.session.commit() - path, error = create_execution_directory(new_execution, user) + # Execution directory creation + (execution_path, + carmin_files_path), error = create_execution_directory( + new_execution, user) if error: db.session.rollback() return error - error = write_inputs_to_file(model, path) + # Writing inputs to inputs file in execution directory + error = write_inputs_to_file(model, carmin_files_path) if error: - delete_execution_directory(path) + delete_execution_directory(execution_path) db.session.rollback() return error + # Copying pipeline descriptor to execution folder + error = copy_descriptor_to_execution_dir(carmin_files_path, + descriptor_path) + if error: + delete_execution_directory(execution_path) + db.session.rollback() + return UNEXPECTED_ERROR + + # Get appriopriate descriptor object + descriptor = Descriptor.descriptor_factory_from_type( + new_execution.descriptor) + if not descriptor: + delete_execution_directory(execution_path) + db.session.rollback() + # We don't have any descriptor defined for this pipeline type + logger = logging.getLogger('server-error') + logger.error( + "Unsupported descriptor type extracted from file at {}". + format(descriptor_path)) + return ErrorCodeAndMessageFormatter( + UNSUPPORTED_DESCRIPTOR_TYPE, descriptor_type) + + # Create a version of the inputs file with correct links + modified_inputs_path, error = create_absolute_path_inputs( + user.username, new_execution.identifier, + new_execution.pipeline_identifier, request.url_root) + if error: + delete_execution_directory(execution_path) + db.session.rollback() + return UNEXPECTED_ERROR + + # We now validate the invocation + success, error = descriptor.validate(descriptor_path, + modified_inputs_path) + if not success: # If this fails, we will change the execution status to InitializationFailed and return this error + new_execution.status = ExecutionStatus.InitializationFailed + db.session.commit() + error_code_and_message = ErrorCodeAndMessageFormatter( + INVOCATION_INITIALIZATION_FAILED, new_execution.identifier) + return ErrorCodeAndMessageAdditionalDetails( + error_code_and_message, str(error)) + + # Get execution from DB (for safe measure) execution_db = get_execution(new_execution.identifier, db.session) if not execution_db: return UNEXPECTED_ERROR + + # Get execution back as a model from the DB for response execution, error = get_execution_as_model(user.username, execution_db) if error: diff --git a/server/resources/helpers/execution.py b/server/resources/helpers/execution.py index 664f5a0..2d94d54 100644 --- a/server/resources/helpers/execution.py +++ b/server/resources/helpers/execution.py @@ -1,106 +1,37 @@ -import sys import os -from threading import Timer -from subprocess import Popen -from multiprocessing import Pool, current_process -from server.database.models.user import User -from server.database.models.execution import ExecutionStatus, current_milli_time -from server.database.queries.executions import get_execution -from server.resources.models.execution import Execution -from server.database import db -from server.database.models.execution_process import ExecutionProcess -from server.resources.helpers.path import get_user_data_directory -from server.resources.helpers.executions import get_execution_dir +from pathlib import PurePath +from server import app +from server.resources.helpers.pathnames import EXECUTIONS_DIRNAME -def start_execution(user: User, execution: Execution, - boutique_descriptor_path: str, inputs_path: str, **kwargs): - #Launch the execution process +def extract_execution_identifier_from_path( + absolute_path_to_resource: str) -> str: - pool = Pool() - pool.apply_async( - func=execution_process, - kwds={ - "user": user, - "execution": execution, - "boutique_descriptor_path": boutique_descriptor_path, - "inputs_path": inputs_path - }, - callback=execution_success, - error_callback=execution_error) + rel_path = PurePath( + os.path.relpath(absolute_path_to_resource, + app.config['DATA_DIRECTORY'])).as_posix() - -def execution_process(user: User, execution: Execution, - boutique_descriptor_path: str, inputs_path: str): - #1 Write the current execution pid to database - execution_process = ExecutionProcess( - execution_identifier=execution.identifier, pid=current_process().pid) - db.session.add(execution_process) - db.session.commit() - - #2 Change the execution status in the database - execution_db = get_execution(execution.identifier, db.session) - execution_db.status = ExecutionStatus.Running - execution_db.start_date = current_milli_time() - db.session.commit() - - #3 Launch the bosh execution - user_data_dir = get_user_data_directory(user.username) - execution_dir = get_execution_dir(user.username, execution.identifier) + normalized_path = os.path.normpath(rel_path) + split_path = normalized_path.split(os.sep) try: - with open(os.path.join(execution_dir, "stdout.txt"), - 'wb') as file_stdout, open( - os.path.join(execution_dir, "stderr.txt"), - 'wb') as file_stderr: - - process = Popen( - [ - "bosh", "exec", "launch", - "-v{0}:{0}".format(user_data_dir), - boutique_descriptor_path, inputs_path - ], - stderr=sys.stderr, - stdout=sys.stdout, - cwd=execution_dir) - - # os.remove(inputs_path) # TODO: Validate when it is safe to delete the input file. - process.wait() - - except OSError: - execution_db.status = ExecutionStatus.ExecutionFailed - db.session.delete(execution_process) - db.session.commit() - return - finally: - process.kill() - - #4 Execution completed - Writing to database - execution_db.status = ExecutionStatus.Finished - db.session.delete(execution_process) - db.session.commit() - - # time_out = Timer( - # execution_db.timeout - # if execution_db.timeout else 10, execution_timeout, [process] - # ) #TODO: Look at default timeout in platform properties. Not required for now. Should it be? - - # try: - # time_out.start() - # process.communicate() - # time_out.cancel() - # finally: - - -def execution_success(success): - return - - -def execution_error(error): - return - - -def execution_timeout(process): - process.kill() - - return \ No newline at end of file + executions_folder_index = split_path.index(EXECUTIONS_DIRNAME) + # The execution folder should be at the root of the user data directory + # Otherwise, this is not an execution + if executions_folder_index != 1: + return None + + actual_execution_folder_index = executions_folder_index + 1 + + if len(split_path) < actual_execution_folder_index + 1: + return None + elif len(split_path) == actual_execution_folder_index + 1: + # We have an execution folder + if os.path.isdir(absolute_path_to_resource): + return split_path[actual_execution_folder_index] + else: + return None + else: + return split_path[actual_execution_folder_index] + except ValueError: + return None diff --git a/server/resources/helpers/execution_kill.py b/server/resources/helpers/execution_kill.py new file mode 100644 index 0000000..d10c970 --- /dev/null +++ b/server/resources/helpers/execution_kill.py @@ -0,0 +1,50 @@ +import signal +from subprocess import call +from typing import List +from psutil import Process, wait_procs, NoSuchProcess +from server.database.models.execution_process import ExecutionProcess + + +def kill_all_execution_processes(execution_processes: List[ExecutionProcess]): + actual_execution_processes = [ + e for e in execution_processes if e.is_execution + ] + execution_parent_processes = [ + e for e in execution_processes if not e.is_execution + ] + + kill_execution_processes(execution_parent_processes) + kill_execution_processes(actual_execution_processes) + + +def kill_execution_processes(processes: List[ExecutionProcess]): + for process_entry in processes: + try: + process = Process(process_entry.pid) + children = process.children(recursive=True) + children.append(process) + for p in children: + call(['kill', '-s', 'TERM', str(p.pid)]) + _, alive = wait_procs(children, timeout=2) + for p in alive: + call(['kill', '-s', 'QUIT', str(p.pid)]) + except NoSuchProcess: + # The process was already killed. Let's continue + pass + + +def get_process_alive_count(processes: List[ExecutionProcess], + count_children: bool = False): + count = 0 + for process_entry in processes: + try: + process = Process(process_entry.pid) + count += 1 + if count_children: + children = process.children(recursive=True) + count += len(children) + except NoSuchProcess: + # The process was already killed. Let's continue + pass + + return count diff --git a/server/resources/helpers/execution_play.py b/server/resources/helpers/execution_play.py new file mode 100644 index 0000000..f31ccaa --- /dev/null +++ b/server/resources/helpers/execution_play.py @@ -0,0 +1,122 @@ +import sys +import os +from subprocess import Popen, TimeoutExpired +from multiprocessing import Pool, current_process +from server import app +from server.platform_properties import PLATFORM_PROPERTIES +from server.database import db +from server.database.models.user import User +from server.database.models.execution import ExecutionStatus, current_milli_time +from server.database.models.execution_process import ExecutionProcess +from server.database.queries.executions import get_execution +from server.resources.models.execution import Execution +from server.resources.helpers.path import get_user_data_directory +from server.resources.helpers.executions import ( + get_execution_dir, get_descriptor_path, std_file_path, STDOUT_FILENAME, + STDERR_FILENAME) +from server.resources.helpers.execution_kill import kill_execution_processes +from server.resources.models.descriptor.descriptor_abstract import Descriptor + + +def start_execution(user: User, execution: Execution, descriptor: Descriptor, + inputs_path: str): + # Launch the execution process + if app.config["TESTING"]: + execution_process( + user=user, + execution=execution, + descriptor=descriptor, + inputs_path=inputs_path) + else: + pool = Pool(processes=1) + pool.apply_async( + func=execution_process, + kwds={ + "user": user, + "execution": execution, + "descriptor": descriptor, + "inputs_path": inputs_path + }) + pool.close() + + +def execution_process(user: User, execution: Execution, descriptor: Descriptor, + inputs_path: str): + # 1 Write the current execution pid to database + execution_process = ExecutionProcess( + execution_identifier=execution.identifier, + pid=current_process().pid, + is_execution=False) + db.session.add(execution_process) + db.session.commit() + + # 2 Change the execution status in the database + execution_db = get_execution(execution.identifier, db.session) + execution_db.status = ExecutionStatus.Running + execution_db.start_date = current_milli_time() + db.session.commit() + + # 3 Launch the bosh execution + user_data_dir = get_user_data_directory(user.username) + execution_dir = get_execution_dir(user.username, execution.identifier) + descriptor_path = get_descriptor_path(user.username, execution.identifier) + timeout = execution.timeout + if timeout is None: + timeout = PLATFORM_PROPERTIES.get("defaultExecutionTimeout") + if not timeout: + timeout = None + + with open( + std_file_path(user.username, execution.identifier, + STDOUT_FILENAME), + 'w') as file_stdout, open( + std_file_path(user.username, execution.identifier, + STDERR_FILENAME), 'w') as file_stderr: + try: + process = Popen( + descriptor.execute(user_data_dir, descriptor_path, + inputs_path), + stdout=file_stdout, + stderr=file_stderr, + cwd=execution_dir) + + # Insert Popen process in DB + execution_process_popen = ExecutionProcess( + execution_identifier=execution.identifier, + pid=process.pid, + is_execution=True) + db.session.add(execution_process_popen) + db.session.commit() + + exit_code = process.wait(timeout=timeout) + except TimeoutExpired as timeout_expired: # Timeout + kill_execution_processes([execution_process_popen]) + file_stderr.writelines( + "Execution timed out after {} seconds".format( + timeout_expired.timeout)) + ExecutionFailed(execution_db) + except Exception: # Any other execution issue + ExecutionFailed(execution_db) + kill_execution_processes([execution_process_popen]) + else: + # 4 Execution successfully completed - Writing to database + if exit_code == 0: + execution_db.status = ExecutionStatus.Finished + else: + execution_db.status = ExecutionStatus.ExecutionFailed + db.session.commit() + finally: + # Delete Execution processes from the database + db.session.delete(execution_process) + db.session.delete(execution_process_popen) + # Insert endtime + execution_db.end_date = current_milli_time() + db.session.commit() + + # Delete temporary absolute input paths files + os.remove(inputs_path) + + +def ExecutionFailed(execution_db): + execution_db.status = ExecutionStatus.ExecutionFailed + db.session.commit() diff --git a/server/resources/helpers/execution_results.py b/server/resources/helpers/execution_results.py new file mode 100644 index 0000000..3b4a9de --- /dev/null +++ b/server/resources/helpers/execution_results.py @@ -0,0 +1,28 @@ +import os +try: + from os import scandir, walk +except ImportError: + from scandir import scandir, walk +from typing import List +from server.resources.helpers.executions import CARMIN_FILES_FOLDER, get_execution_dir +from server.resources.models.error_code_and_message import ErrorCodeAndMessage +from server.resources.models.path import Path + + +def get_output_files(username: str, execution_identifier: str + ) -> (List[str], ErrorCodeAndMessage): + try: + execution_dir = get_execution_dir(username, execution_identifier) + except FileNotFoundError: + return None, PATH_DOES_NOT_EXIST + + excluded_dirs = [CARMIN_FILES_FOLDER] + output_files = list() + for root, dirs, files in walk(execution_dir): + dirs[:] = [d for d in dirs if d not in CARMIN_FILES_FOLDER] + + for f in files: + real_path = os.path.realpath(os.path.join(root, f)) + output_files.append(Path.object_from_pathname(real_path)) + + return output_files, None diff --git a/server/resources/helpers/executions.py b/server/resources/helpers/executions.py index 0f701a9..df9c293 100644 --- a/server/resources/helpers/executions.py +++ b/server/resources/helpers/executions.py @@ -1,27 +1,25 @@ import os import json import shutil +import tempfile from boutiques import bosh from typing import Dict from server import app -from server.database.models.user import User +from server.database.models.user import User, Role from server.database.models.execution import Execution as ExecutionDB +from server.platform_properties import PLATFORM_PROPERTIES from server.resources.models.error_code_and_message import ErrorCodeAndMessage from server.resources.models.pipeline import Pipeline, PipelineSchema from server.common.error_codes_and_messages import ( UNAUTHORIZED, INVALID_INPUT_FILE, INVALID_PATH, INVALID_MODEL_PROVIDED, INVALID_PIPELINE_IDENTIFIER, EXECUTION_IDENTIFIER_MUST_NOT_BE_SET, - INVALID_QUERY_PARAMETER, PATH_DOES_NOT_EXIST, UNEXPECTED_ERROR, - ErrorCodeAndMessageFormatter) -from server.resources.models.execution import Execution + INVALID_QUERY_PARAMETER, INVALID_EXECUTION_TIMEOUT, PATH_DOES_NOT_EXIST, + UNEXPECTED_ERROR, ErrorCodeAndMessageFormatter) +from server.resources.models.execution import Execution, EXECUTION_COMPLETED_STATUSES from server.resources.helpers.pipelines import get_pipeline - -INPUTS_FILENAME = "inputs.json" -EXECUTIONS_DIRNAME = "executions" -ABSOLUTE_PATH_INPUTS_FILENAME = "inputs_abs.json" - -STDOUT_FILENAME = "stdout.txt" -STDERR_FILENAME = "stderr.txt" +from server.resources.helpers.pathnames import ( + INPUTS_FILENAME, EXECUTIONS_DIRNAME, DESCRIPTOR_FILENAME, + CARMIN_FILES_FOLDER, STDOUT_FILENAME, STDERR_FILENAME) def create_user_executions_dir(username: str): @@ -34,19 +32,23 @@ def create_user_executions_dir(username: str): return user_execution_dir -def create_execution_directory(execution: ExecutionDB, - user: User) -> (str, ErrorCodeAndMessage): +def create_execution_directory(execution: ExecutionDB, user: User + ) -> ((str, str), ErrorCodeAndMessage): user_execution_dir = create_user_executions_dir(user.username) execution_dir_absolute_path = os.path.join(user_execution_dir, execution.identifier) + carmin_dir_absolute_path = os.path.join(execution_dir_absolute_path, + CARMIN_FILES_FOLDER) if not is_safe_path(execution_dir_absolute_path) or not is_data_accessible( execution_dir_absolute_path, user): - return None, UNAUTHORIZED + return (None, None), UNAUTHORIZED path, error = create_directory(execution_dir_absolute_path) - return execution_dir_absolute_path, error + carmin_path, error = create_directory(carmin_dir_absolute_path) + + return (execution_dir_absolute_path, carmin_dir_absolute_path), error def delete_execution_directory(execution_dir_path: str): @@ -62,6 +64,15 @@ def get_execution_dir(username: str, execution_identifier: str) -> str: return execution_dir +def get_execution_carmin_files_dir(username: str, + execution_identifier: str) -> str: + execution_carmin_files_dir = os.path.join( + get_execution_dir(username, execution_identifier), CARMIN_FILES_FOLDER) + if not os.path.isdir(execution_carmin_files_dir): + raise FileNotFoundError + return execution_carmin_files_dir + + def write_inputs_to_file(execution: Execution, path_to_execution_dir: str) -> ErrorCodeAndMessage: inputs_json_file = os.path.join(path_to_execution_dir, INPUTS_FILENAME) @@ -74,10 +85,10 @@ def write_inputs_to_file(execution: Execution, def write_absolute_path_inputs_to_file( - input_values: Dict, - path_to_execution_dir: str) -> (str, ErrorCodeAndMessage): - inputs_json_file = os.path.join(path_to_execution_dir, - ABSOLUTE_PATH_INPUTS_FILENAME) + username: str, execution_identifier: str, + input_values: Dict) -> (str, ErrorCodeAndMessage): + inputs_json_file = get_absolute_path_inputs_path(username, + execution_identifier) write_content = json.dumps(input_values) try: with open(inputs_json_file, 'w') as f: @@ -88,6 +99,15 @@ def write_absolute_path_inputs_to_file( return inputs_json_file, None +def get_absolute_path_inputs_path(username: str, + execution_identifier: str) -> str: + carmin_files_dir = get_execution_carmin_files_dir(username, + execution_identifier) + absolute_path_inputs_path = os.path.join( + carmin_files_dir, "{}.json".format(execution_identifier)) + return absolute_path_inputs_path + + def input_files_exist(input_values: Dict, pipeline: Pipeline, url_root: str) -> (bool, str): pipeline_parameters = pipeline.parameters @@ -119,9 +139,8 @@ def create_absolute_path_inputs(username: str, execution_identifier: str, input_values[key] = path_from_data_dir(url_root, input_values[key]) - execution_dir = get_execution_dir(username, execution_identifier) - path, error = write_absolute_path_inputs_to_file(input_values, - execution_dir) + path, error = write_absolute_path_inputs_to_file( + username, execution_identifier, input_values) if error: return None, error return path, None @@ -129,9 +148,8 @@ def create_absolute_path_inputs(username: str, execution_identifier: str, def load_inputs(username: str, execution_identifier: str) -> (Dict, ErrorCodeAndMessage): - execution_inputs_absolute_path = os.path.join( - get_user_data_directory(username), EXECUTIONS_DIRNAME, - execution_identifier, INPUTS_FILENAME) + execution_inputs_absolute_path = get_inputs_file_path( + username, execution_identifier) if not os.path.exists(execution_inputs_absolute_path): return None, INVALID_PATH @@ -141,10 +159,10 @@ def load_inputs(username: str, return inputs, None -def inputs_file_path(username: str, execution_identifier: str) -> str: +def get_inputs_file_path(username: str, execution_identifier: str) -> str: return os.path.join( get_user_data_directory(username), EXECUTIONS_DIRNAME, - execution_identifier, INPUTS_FILENAME) + execution_identifier, CARMIN_FILES_FOLDER, INPUTS_FILENAME) def get_execution_as_model(username: str, @@ -153,17 +171,27 @@ def get_execution_as_model(username: str, return None, INVALID_MODEL_PROVIDED inputs, error = load_inputs(username, execution_db.identifier) if error: - return None, error + inputs = {"error": "Error retrieving inputs for execution."} dummy_exec = Execution() execution_kwargs = { prop: execution_db.__dict__[prop] for prop in dummy_exec.__dict__.keys() if prop in execution_db.__dict__ } exe = Execution(input_values=inputs, **execution_kwargs) + if exe.status in EXECUTION_COMPLETED_STATUSES: + """This implementation does not currently respect the current + (0.3) API specification. It simply returns a list of output files that + were generated from the execution.""" + from server.resources.helpers.execution_results import get_output_files + output_files = get_output_files(username, exe.identifier) + path_list = [] + for output in output_files: + path_list.append(output.platform_path) + exe.returned_files = path_list return exe, None -def validate_request_model(model: dict, +def validate_request_model(model: Execution, url_root: str) -> (bool, ErrorCodeAndMessage): if model.identifier: return False, EXECUTION_IDENTIFIER_MUST_NOT_BE_SET @@ -176,38 +204,54 @@ def validate_request_model(model: dict, error_code_and_message = ErrorCodeAndMessageFormatter( INVALID_INPUT_FILE, error) return False, error_code_and_message + + # Timeout validation + min_authorized_execution_timeout = PLATFORM_PROPERTIES.get( + "minAuthorizedExecutionTimeout", 0) + max_authorized_execution_timeout = PLATFORM_PROPERTIES.get( + "maxAuthorizedExecutionTimeout", 0) + if model.timeout and ((max_authorized_execution_timeout > 0 and + model.timeout > max_authorized_execution_timeout) or + (model.timeout < min_authorized_execution_timeout)): + error_code_and_message = ErrorCodeAndMessageFormatter( + INVALID_EXECUTION_TIMEOUT, min_authorized_execution_timeout, + max_authorized_execution_timeout or "(no maximum timeout)") + return False, error_code_and_message return True, None -def filter_executions(executions, offset, limit): - if offset: - try: - offset = int(offset) - except ValueError: - return None, ErrorCodeAndMessageFormatter(INVALID_QUERY_PARAMETER, - offset, 'offset') - if offset < 0: - return None, ErrorCodeAndMessageFormatter(INVALID_QUERY_PARAMETER, - offset, 'offset') - executions = executions[offset:] - if limit: - try: - limit = int(limit) - except ValueError: - return None, ErrorCodeAndMessageFormatter(INVALID_QUERY_PARAMETER, - limit, 'limit') - if limit < 0: - return None, ErrorCodeAndMessageFormatter(INVALID_QUERY_PARAMETER, - limit, 'limit') - executions = executions[0:limit] - return executions, None +def query_converter(value): + converted_value = int(value) + if converted_value < 0: + raise ValueError + return converted_value + + +def copy_descriptor_to_execution_dir(execution_path, + descriptor_path) -> ErrorCodeAndMessage: + if not os.path.exists(descriptor_path): + return PATH_DOES_NOT_EXIST + + try: + shutil.copyfile(descriptor_path, + os.path.join(execution_path, DESCRIPTOR_FILENAME)) + except OSError: + return UNEXPECTED_ERROR + + return None + + +def get_descriptor_path(username: str, execution_identifier: str) -> str: + return os.path.join( + get_execution_carmin_files_dir(username, execution_identifier), + DESCRIPTOR_FILENAME) def std_file_path(username: str, execution_identifier: str, filename: str) -> str: return os.path.join( get_user_data_directory(username), EXECUTIONS_DIRNAME, - execution_identifier, filename) + execution_identifier, CARMIN_FILES_FOLDER, filename) def get_std_file(username: str, execution_identifier: str, @@ -221,6 +265,12 @@ def get_std_file(username: str, execution_identifier: str, return None, PATH_DOES_NOT_EXIST +def is_safe_for_get(user: User, execution_db: ExecutionDB): + if user.role == Role.admin: + return True + return execution_db.creator_username == user.username + + from .path import (create_directory, get_user_data_directory, is_safe_path, is_data_accessible, platform_path_exists, path_from_data_dir) diff --git a/server/resources/helpers/pathnames.py b/server/resources/helpers/pathnames.py new file mode 100644 index 0000000..be887fd --- /dev/null +++ b/server/resources/helpers/pathnames.py @@ -0,0 +1,7 @@ +INPUTS_FILENAME = "inputs.json" +EXECUTIONS_DIRNAME = "executions" +DESCRIPTOR_FILENAME = "descriptor.json" +CARMIN_FILES_FOLDER = ".carmin-files" + +STDOUT_FILENAME = "stdout.txt" +STDERR_FILENAME = "stderr.txt" diff --git a/server/resources/helpers/pipelines.py b/server/resources/helpers/pipelines.py index 314c4bb..6caa377 100644 --- a/server/resources/helpers/pipelines.py +++ b/server/resources/helpers/pipelines.py @@ -4,8 +4,11 @@ except ImportError: from scandir import scandir, walk import json +import logging from boutiques import bosh from server import app +from server.resources.models.descriptor.descriptor_abstract import Descriptor +from server.resources.models.descriptor.supported_descriptors import SUPPORTED_DESCRIPTORS from server.resources.models.pipeline import Pipeline, PipelineSchema from server.common.error_codes_and_messages import ( ErrorCodeAndMessageAdditionalDetails, ErrorCodeAndMessageFormatter, @@ -23,8 +26,9 @@ def pipelines(pipeline_identifier: str = None, if study_identifier: dirs[:] = [i for i in dirs if i == study_identifier] - # We exclude the boutiques folder as it includes the boutiques original descriptors - dirs[:] = [i for i in dirs if i != "boutiques"] + # We exclude the descriptor folders as they include the + # original, non-converted descriptors + dirs[:] = [i for i in dirs if i not in SUPPORTED_DESCRIPTORS.keys()] for file in files: real_path = os.path.realpath(os.path.join(subdir, file)) @@ -69,39 +73,43 @@ def get_pipeline(pipeline_identifier: str, for pipeline in all_pipelines: with open(pipeline.path) as f: - pipeline_json = json.load(f) - if pipeline_json["identifier"] == pipeline_identifier: - return pipeline if only_path else PipelineSchema().load( - pipeline_json).data - return None - + try: + pipeline_json = json.load(f) + if pipeline_json["identifier"] == pipeline_identifier: + return pipeline if only_path else PipelineSchema().load( + pipeline_json).data + except json.JSONDecodeError: + # We log the invalid pipeline, but just continue instead of crashing + logger = logging.getLogger('server-error') + logger.error("Invalid pipeline at {}".format(pipeline.path)) -def export_boutiques_pipelines() -> (bool, str): - all_pipelines = get_all_pipelines("boutiques") + return None - for pipeline in all_pipelines: - carmin_pipeline = os.path.join(app.config['PIPELINE_DIRECTORY'], - "boutiques_{}".format(pipeline.name)) - try: - bosh(["export", "carmin", pipeline.path, carmin_pipeline]) - except Exception: - return False, "Boutiques descriptor at '{}' is invalid and could not be translated. Please fix it before launching the server.".format( - pipeline.path) +def export_all_pipelines() -> (bool, str): + for descriptor_type in SUPPORTED_DESCRIPTORS: + all_pipelines = get_all_pipelines(descriptor_type) - if not os.path.exists(carmin_pipeline): - return False, "Boutiques descriptor at '{}' was exported without error, but no output file was created." + for pipeline in all_pipelines: + carmin_pipeline = os.path.join(app.config['PIPELINE_DIRECTORY'], + "{}_{}".format( + descriptor_type, pipeline.name)) + descriptor = Descriptor.descriptor_factory_from_type( + descriptor_type) + export, error = descriptor.export(pipeline.path, carmin_pipeline) + if error: + return False, error return True, None -def get_original_descriptor_path( - pipeline_identifier: str) -> (str, ErrorCodeAndMessage): +def get_original_descriptor_path_and_type( + pipeline_identifier: str) -> ((str, str), ErrorCodeAndMessage): carmin_descriptor_path = get_pipeline(pipeline_identifier, True) if not carmin_descriptor_path: - return None, INVALID_PIPELINE_IDENTIFIER + return (None, None), INVALID_PIPELINE_IDENTIFIER descriptor_type = carmin_descriptor_path.name[:carmin_descriptor_path.name. index("_")] @@ -110,7 +118,7 @@ def get_original_descriptor_path( original_descriptor_path = os.path.join(app.config['PIPELINE_DIRECTORY'], descriptor_type, original_descriptor_filename) - return original_descriptor_path, None + return (original_descriptor_path, descriptor_type), None def get_descriptor_json(pipeline_path: str) -> (any, ErrorCodeAndMessage): diff --git a/server/resources/helpers/std.py b/server/resources/helpers/std.py index d86f99d..810b689 100644 --- a/server/resources/helpers/std.py +++ b/server/resources/helpers/std.py @@ -2,10 +2,10 @@ from server.database import db from server.common.utils import marshal from server.common.error_codes_and_messages import ( - ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND) + ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, UNAUTHORIZED) from server.database.queries.executions import get_execution -from server.resources.helpers.executions import (get_execution_as_model, - get_std_file, STDERR_FILENAME) +from server.resources.helpers.executions import ( + get_execution_as_model, get_std_file, STDERR_FILENAME, is_safe_for_get) from server.resources.decorators import login_required @@ -15,7 +15,12 @@ def std_file_resource(user, execution_identifier, path_to_file): error = ErrorCodeAndMessageFormatter(EXECUTION_NOT_FOUND, execution_identifier) return marshal(error), 400 - execution, error = get_execution_as_model(user.username, execution_db) + + if not is_safe_for_get(user, execution_db): + return UNAUTHORIZED + + execution, error = get_execution_as_model(execution_db.creator_username, + execution_db) if error: return marshal(error), 400 diff --git a/server/resources/models/descriptor/__init__.py b/server/resources/models/descriptor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/resources/models/descriptor/boutiques.py b/server/resources/models/descriptor/boutiques.py new file mode 100644 index 0000000..a815a6c --- /dev/null +++ b/server/resources/models/descriptor/boutiques.py @@ -0,0 +1,39 @@ +import os +from boutiques import bosh +from jsonschema import ValidationError +from server import app +from server.resources.models.descriptor.descriptor_abstract import Descriptor + + +class Boutiques(Descriptor): + @classmethod + def validate(cls, descriptor_path, input_path): + try: + bosh(["invocation", descriptor_path, "-i", input_path]) + except ValidationError as e: + return False, e.message + return True, None + + @classmethod + def export(cls, input_descriptor_path, output_descriptor_path): + relative_path = os.path.relpath( + output_descriptor_path, start=app.config['PIPELINE_DIRECTORY']) + try: + bosh([ + "export", "carmin", input_descriptor_path, "--identifier", + relative_path, output_descriptor_path + ]) + except Exception: + return False, "Boutiques descriptor at '{}' is invalid and could not be translated. Please fix it before launching the server.".format( + input_descriptor_path) + + if not os.path.exists(output_descriptor_path): + return False, "Boutiques descriptor at '{}' was exported without error, but no output file was created." + return True, None + + @classmethod + def execute(cls, user_data_dir, descriptor, input_data): + return [ + "bosh", "exec", "launch", "-v{0}:{0}".format(user_data_dir), + descriptor, input_data + ] diff --git a/server/resources/models/descriptor/descriptor_abstract.py b/server/resources/models/descriptor/descriptor_abstract.py new file mode 100644 index 0000000..8b99aa0 --- /dev/null +++ b/server/resources/models/descriptor/descriptor_abstract.py @@ -0,0 +1,35 @@ +from abc import ABC, abstractmethod +import os + + +class Descriptor(ABC): + """Descriptors must subclass Desciptor to define their behavior. + Refer to `boutiques.py` for an example of implementation. + Descriptors that subclass Descriptor must be included in + `SUPPORTED_DESCRIPTORS` in `supported_descriptors.py`.""" + + @classmethod + @abstractmethod + def validate(cls, descriptor, input_data): + pass + + @classmethod + @abstractmethod + def export(cls, input_descriptor_path, output_descriptor_path): + pass + + @classmethod + @abstractmethod + def execute(cls, user_data_dir, descriptor, input_data): + pass + + @classmethod + def descriptor_factory_from_type(cls, typ): + from server.resources.models.descriptor.supported_descriptors import SUPPORTED_DESCRIPTORS + return SUPPORTED_DESCRIPTORS.get(typ.lower())() + + @classmethod + def descriptor_factory_from_path(cls, path_to_descriptor): + parent_dir = os.path.dirname(path_to_descriptor) + return Descriptor.descriptor_factory_from_type( + os.path.basename(parent_dir)) diff --git a/server/resources/models/descriptor/supported_descriptors.py b/server/resources/models/descriptor/supported_descriptors.py new file mode 100644 index 0000000..bd08c22 --- /dev/null +++ b/server/resources/models/descriptor/supported_descriptors.py @@ -0,0 +1,7 @@ +from server.resources.models.descriptor.boutiques import Boutiques +""" +SUPPORTED_DESCRIPTORS contains all decriptors that are supported by the platform. +Keys should not include characters that can cause problems for filenames and/or +paths. Stay away from '/', '\', '?', for example. +""" +SUPPORTED_DESCRIPTORS = {'boutiques': Boutiques} diff --git a/server/resources/models/execution.py b/server/resources/models/execution.py index 6710394..e581693 100644 --- a/server/resources/models/execution.py +++ b/server/resources/models/execution.py @@ -15,6 +15,12 @@ class ExecutionStatus(enum.Enum): Killed = "Killed" +EXECUTION_COMPLETED_STATUSES = [ + ExecutionStatus.Finished, ExecutionStatus.ExecutionFailed, + ExecutionStatus.Unknown, ExecutionStatus.Killed +] + + class ExecutionSchema(Schema): SKIP_VALUES = list([None]) diff --git a/server/resources/models/path.py b/server/resources/models/path.py index 24bdd39..8d7441f 100644 --- a/server/resources/models/path.py +++ b/server/resources/models/path.py @@ -4,6 +4,7 @@ from server import app from flask_restful import request from marshmallow import Schema, fields, post_load, post_dump +from server.resources.helpers.execution import extract_execution_identifier_from_path class PathSchema(Schema): @@ -86,6 +87,8 @@ def object_from_pathname(cls, absolute_path_to_resource: str): mime_type, _ = mimetypes.guess_type(absolute_path_to_resource) # TODO: Add execution_id to Path object + execution_id = extract_execution_identifier_from_path( + absolute_path_to_resource) rel_path = PurePath( os.path.relpath(absolute_path_to_resource, @@ -96,7 +99,8 @@ def object_from_pathname(cls, absolute_path_to_resource: str): last_modification_date=os.path.getmtime(absolute_path_to_resource), is_directory=os.path.isdir(absolute_path_to_resource), size=Path.get_path_size(absolute_path_to_resource, is_directory), - mime_type=mime_type) + mime_type=mime_type, + execution_id=execution_id) @classmethod def get_path_size(cls, absolute_path: str, is_dir: bool) -> int: diff --git a/server/resources/path.py b/server/resources/path.py index 6925979..63f4c8e 100644 --- a/server/resources/path.py +++ b/server/resources/path.py @@ -7,8 +7,7 @@ from server.common.error_codes_and_messages import ( ErrorCodeAndMessageFormatter, UNAUTHORIZED, INVALID_PATH, INVALID_ACTION, MD5_ON_DIR, LIST_ACTION_ON_FILE, ACTION_REQUIRED, UNEXPECTED_ERROR, - PATH_DOES_NOT_EXIST, PATH_IS_DIRECTORY, INVALID_REQUEST, - PATH_DOES_NOT_EXIST) + PATH_IS_DIRECTORY, INVALID_REQUEST, PATH_DOES_NOT_EXIST) from .models.upload_data import UploadDataSchema from .models.boolean_response import BooleanResponse from .models.path import Path as PathModel diff --git a/server/resources/pipeline.py b/server/resources/pipeline.py index 0be2b08..6b37e2a 100644 --- a/server/resources/pipeline.py +++ b/server/resources/pipeline.py @@ -1,6 +1,7 @@ import os import json from flask_restful import Resource +from server.common.error_codes_and_messages import INVALID_PIPELINE_IDENTIFIER from .models.error_code_and_message import ErrorCodeAndMessage from .models.pipeline import Pipeline, PipelineSchema from .decorators import login_required, marshal_response @@ -14,4 +15,4 @@ def get(self, user, pipeline_identifier): pipeline = pipelines(pipeline_identifier) if isinstance(pipeline, ErrorCodeAndMessage): return pipeline - return next(iter(pipeline), {}) + return next(iter(pipeline), INVALID_PIPELINE_IDENTIFIER) diff --git a/server/resources/pipeline_boutiquesdescriptor.py b/server/resources/pipeline_boutiquesdescriptor.py index 95c610d..4faf79f 100644 --- a/server/resources/pipeline_boutiquesdescriptor.py +++ b/server/resources/pipeline_boutiquesdescriptor.py @@ -4,7 +4,7 @@ from server.common.utils import marshal from .models.error_code_and_message import ErrorCodeAndMessage from .decorators import login_required -from server.resources.helpers.pipelines import get_original_descriptor_path, get_descriptor_json +from server.resources.helpers.pipelines import get_original_descriptor_path_and_type, get_descriptor_json class PipelineBoutiquesDescriptor(Resource): @@ -13,8 +13,8 @@ class PipelineBoutiquesDescriptor(Resource): # Moreover, there is no BoutiquesSchema yet. If it was to be added, this should be changed. @login_required def get(self, user, pipeline_identifier): - boutiques_descriptor_path, error = get_original_descriptor_path( - pipeline_identifier) + (boutiques_descriptor_path, descriptor_type + ), error = get_original_descriptor_path_and_type(pipeline_identifier) if error: return marshal(error), 400 diff --git a/server/resources/post_processors.py b/server/resources/post_processors.py new file mode 100644 index 0000000..cb5477f --- /dev/null +++ b/server/resources/post_processors.py @@ -0,0 +1,14 @@ +from server import app + + +@app.after_request +def set_content_length_to_0(response): + """This is a bug in the current version of Flask. All 204 responses with + no body returns a Content-Length of 3, which causes problems with the + connection expectations. This function sets Content-Length to 0 for all + 204 responses with no body.""" + if (response.status_code == 204 + and response.headers.get('Content-Length') != '0' + and response.data == b'""\n'): + response.headers['Content-Length'] = 0 + return response diff --git a/server/startup_validation.py b/server/startup_validation.py index 534a23c..8491f36 100644 --- a/server/startup_validation.py +++ b/server/startup_validation.py @@ -7,22 +7,29 @@ import string import random from typing import Dict -from .config import (SUPPORTED_PROTOCOLS, SUPPORTED_MODULES, - SQLITE_DEFAULT_PROD_DB_URI, DEFAULT_PROD_DB_URI) +from .config import (SQLITE_DEFAULT_PROD_DB_URI, DEFAULT_PROD_DB_URI) from .resources.models.platform_properties import PlatformPropertiesSchema from server import app from server.database import db from .database.models.user import User, Role -from server.resources.helpers.pipelines import export_boutiques_pipelines +from .database.models.execution import Execution, ExecutionStatus +from .database.models.execution_process import ExecutionProcess +from .database.queries.executions import get_execution_processes +from server.resources.helpers.pipelines import export_all_pipelines from server.common.error_codes_and_messages import PATH_EXISTS +from server.resources.models.descriptor.supported_descriptors import SUPPORTED_DESCRIPTORS from server.platform_properties import PLATFORM_PROPERTIES +from server.resources.helpers.execution_kill import (get_process_alive_count, + kill_execution_processes) def start_up(): + create_dirs_for_supported_descriptors() pipeline_and_data_directory_present() export_pipelines() properties_validation() find_or_create_admin() + purge_executions() def properties_validation(config_data: Dict = None) -> bool: @@ -37,16 +44,6 @@ def properties_validation(config_data: Dict = None) -> bool: if err: raise EnvironmentError(err) - # Raise error if unsupported protocol or module - for protocol in platform_properties.supported_transfer_protocols: - if protocol not in SUPPORTED_PROTOCOLS: - err = str.format("Unsupported protocol {}", protocol) - raise ValueError(err) - for module in platform_properties.supported_modules: - if module not in SUPPORTED_MODULES: - err = str.format("Unsupported module {}", module) - raise ValueError(err) - # Raise error if https not in supported protocols if "https" not in platform_properties.supported_transfer_protocols: raise EnvironmentError('CARMIN 0.3 requires https support') @@ -108,10 +105,70 @@ def generate_admin_password(password_length=16): def export_pipelines(): - success, error = export_boutiques_pipelines() + success, error = export_all_pipelines() if not success: raise EnvironmentError(error) +def create_dirs_for_supported_descriptors(): + pipeline_path = app.config['PIPELINE_DIRECTORY'] + for key in SUPPORTED_DESCRIPTORS: + try: + os.mkdir(os.path.join(pipeline_path, key)) + except FileExistsError: + pass + except OSError: + raise EnvironmentError( + "Directory '{}' could not be created. This problem could be due to insufficient disk space.". + format(key)) + + +def purge_executions(): + # Let's get all executions running + executions = db.session.query(Execution).filter_by( + status=ExecutionStatus.Running) + + for e in executions: + # Look at if it has some processes linked to it + execution_processes = get_execution_processes(e.identifier, db.session) + + if not execution_processes: # Most probably due to the execution being in termination process + continue + + actual_execution_processes = [ + e for e in execution_processes if e.is_execution + ] + execution_parent_processes = [ + e for e in execution_processes if not e.is_execution + ] + + process_still_alive_count = 0 + process_still_alive_count += get_process_alive_count( + actual_execution_processes) + process_still_alive_count += get_process_alive_count( + execution_parent_processes) + + # The execution is going as expected + if process_still_alive_count == len(execution_processes): + continue + + # The execution is supposed to be still running but some of the processes it launched are no more active + # We will mark the execution as "ExecutionFailed" and kill the remaining processes + kill_execution_processes(execution_parent_processes) + kill_execution_processes(actual_execution_processes) + + e.status = ExecutionStatus.Unknown + for execution_process in execution_processes: + db.session.delete(execution_process) + db.session.commit() + + # Now that the executions marked as 'Running' have been purged, let's clean up the remaining execution processes + remaining_processes = db.session.query(ExecutionProcess) + for process in remaining_processes: + kill_execution_processes([process]) + db.session.delete(process) + db.session.commit() + + from server.resources.helpers.register import register_user diff --git a/server/test/conftest.py b/server/test/conftest.py index aaeaad4..ee0a870 100644 --- a/server/test/conftest.py +++ b/server/test/conftest.py @@ -5,9 +5,10 @@ from server.config import TestConfig -@pytest.yield_fixture(scope='session') +@pytest.yield_fixture(autouse=True, scope='session') def app(): - app = create_app(TestConfig) + from server import app + app.config.from_object(TestConfig) declare_api(app) ctx = app.app_context() ctx.push() diff --git a/server/test/database/app.db b/server/test/database/app.db index 32326922d14732a6ecbce086850413afb2512300..5d0e1c6e6267c8909d16a5af99b5242d4b45c002 100644 GIT binary patch delta 137 zcmZoTz}V2hG(l3BQU3q~0|N&XvokQ+CYPeT( zuih-E5W&xAII&TKkz=!Ry6iz<}sbzz$FjB1Mr@cUc-JsgcZG-DE?1F-VbYg*vil z%jh5UQf!yK$gbPV^fk6)dmO362@)hIu)_|`r$CBKedH(ps05%A>G7leqB5Z!Pmg33 z#=-5N(Fnc|!yvf(We@~QC+p21*w}abH{7~d8jGE)zq>ZR{pPQ)e8BA>X#e5AZoR$r z_UamN1Oz|;1V8`;KmY_l00cnbe-LPYY|h*|Uf9IUR)sChH2i66u>D|b5cVG)g#E+){k!4nN-2-br3WU(#Egp4 z3_sC8RUDb|yb62$gKRe&_%>&itmd;tt6dUpo=Z8a=KA#|&HAclIvr1k`B<9^4Nj|} zmTp3Hzqji$)uE-nSd>qVtX%#1%uKKBSRG{a&`qn(8XL)3HJnc_r1){r`(bPFB;3oM zgxxcTO`m@IVgK>LV9TA%nfFP@w;yENdtvwFQ?DO(JH4`~3OOp?6y>wd-LNw-QopWu z=4E+Owo{bF?77jt_%vSnao&-|=(s#v4E=yM2H1 z-HR}r;UL?|23dbQdmMVLu)8qZT-|Jc4B~8~5fr61uV%lD+(dLJ=he9WJUqR+hWK=M z$yNEVzP8r*b+5icePe!Yp1UrO6RoZ+QZ25Kb63Yi&So#iQ+?^d>3DQ{wRV&9mzA6p z!x!fDqMje4bK|$jdU5pXHcy7E-meEt>Z77I_0>*95Bt4GhgsM?$>gNm&;Qf!)~=HB zRfoiH{q!7vA`tt)mJ3|=zg7I)yxDF9?cdx677zdd5C8!X009sH0T2KI5C8!X_*@0v z1&RNc_ zvMNbcP7{+xlImP@7Rw~&8Yk9J8IdTDd;)i`rqpt(k^g|)1RuFu32z!p>+g2gJN3PQ z%TA#>(mx9k>pL_TlEg%$+FBEHE3D0xQdVM@#T_J8t61cmyW=7@Ic7#Axy4ElDY;GS z4E1J_d)19=v;B9_{@^ySfB*=900@8p2!H?xfB*=900@8p2z&+t%dNGBzw^*sZmqA> z_W_RQ|L=qLKkfIQK?k7=AOHd&00JNY0w4eaAOHd&00JOzeFVO4@y4b9Y0$jg>KxDe z7k~d>Jez-gaG<9k00JNY0w4eaAOHd&00JNY0w8d01^kNx+&lj2_xt&G12k_vYyWv| zHK5}l00JNY0w4eaAOHd&00JNY0{>N@+d6D$&SOJVic%_5M010Y5^01KwOlaFV@$=W z4~#fgBsVE?kGC^KX+)4DDo0jpE@GU-OqvxR=$^$_nhJ`zFfpP+Y9yF2NX85kqH}3Y zwCn>Jw|Q)0hPaX(x##mEB)JmEO3VD?`KdAdi#m0bTjCzEM}!fDD8m9N6>%g|%Vg|Y zR75WMz$D_Sh#nb*ORg2tYaE#HA()O5 ExecutionDB: creator_username=username) -POST_VALID_EXECUTION = Execution( - name="valid_execution", - pipeline_identifier="pipeline1", - input_values={ - "file_input": - "http://localhost/path/{}/test.txt".format(standard_user().username) - }) +def post_valid_execution(pipeline_identifier: str): + return Execution( + name="valid_execution", + pipeline_identifier=pipeline_identifier, + input_values={ + "input_file": + "http://localhost/path/{}/test.txt".format( + standard_user().username) + }) -POST_INVALID_EXECUTION_FILE_NOT_EXIST = Execution( - name="invalid_execution", - pipeline_identifier="pipeline1", - input_values={ - "file_input": - "http://localhost/path/{}/does_not_exist.txt".format( - standard_user().username) - }) -POST_INVALID_EXECUTION_ARRAY_FILE_NOT_EXIST = Execution( - name="invalid_execution", - pipeline_identifier="pipeline1", - input_values={ - "file_input": [ +def post_invalid_execution_file_not_exist(pipeline_identifier: str): + return Execution( + name="invalid_execution", + pipeline_identifier=pipeline_identifier, + input_values={ + "input_file": "http://localhost/path/{}/does_not_exist.txt".format( - standard_user().username), - "http://localhost/path/{}/test.txt".format( standard_user().username) - ] - }) + }) + + +def post_invalid_execution_array_file_not_exist(pipeline_identifier: str): + return Execution( + name="invalid_execution", + pipeline_identifier=pipeline_identifier, + input_values={ + "input_file": [ + "http://localhost/path/{}/does_not_exist.txt".format( + standard_user().username), + "http://localhost/path/{}/test.txt".format( + standard_user().username) + ] + }) + POST_INVALID_EXECUTION_IDENTIFIER_NOT_EXIST = Execution( name="invalid_execution", @@ -48,13 +55,16 @@ def execution_for_db(execution_id: str, username: str) -> ExecutionDB: "first": "value" }) -POST_INVALID_IDENTIFIER_SET = Execution( - name="invalid_execution", - pipeline_identifier="pipeline1", - identifier="an_identifier", - input_values={ - "first": "value" - }) + +def post_invalid_identifier_set(pipeline_identifier: str): + return Execution( + name="invalid_execution", + pipeline_identifier=pipeline_identifier, + identifier="an_invalid_identifier", + input_values={ + "first": "value" + }) + POST_INVALID_MODEL = {"name": "invalid_execution"} diff --git a/server/test/fakedata/pipelines.py b/server/test/fakedata/pipelines.py index 2408aca..b6b7eff 100644 --- a/server/test/fakedata/pipelines.py +++ b/server/test/fakedata/pipelines.py @@ -1,6 +1,35 @@ +import json +import copy from server.resources.models.pipeline import Pipeline, PipelineParameter from server.resources.models.error_code_and_message import ErrorCodeAndMessage + +class PipelineStub(): + def __init__(self, + original: dict, + converted: dict, + original_filename: str, + descriptor_type: str = "boutiques"): + self.original = copy.deepcopy(original) + self.converted = copy.deepcopy(converted) + self.descriptor_type = descriptor_type + self.original_filename = original_filename + self.identifier = "{}_{}".format(descriptor_type, original_filename) + self.converted["identifier"] = self.identifier + + def get_original_filename(self): + return self.original_filename + + def get_converted_filename(self): + return self.identifier + + def get_original_json(self): + return json.dumps(self.original) + + def get_converted_json(self): + return json.dumps(self.converted) + + NameStudyOne = "study_one" NameStudyTwo = "study_two" @@ -68,8 +97,8 @@ [ErrorCodeAndMessage(2000, "Pipeline three error code and message")])) PIPELINE_FOUR = Pipeline( - identifier="pipeline1", - name="pipeline1", + identifier="four", + name="four", version="4.0.0", description="test pipeline", can_execute=True, @@ -78,3 +107,177 @@ PropNameTwo: PropValueThree}, error_codes_and_messages=list( [ErrorCodeAndMessage(2000, "Pipeline four error code and message")])) + +BOUTIQUES_SLEEP_ORIGINAL = { + "command-line": + "sleep 30 && echo \"Welcome to CARMIN-Server, $(cat [INPUT_FILE]).\" &> [OUTPUT_FILE]", + "container-image": { + "image": "alpine", + "type": "docker" + }, + "description": + "A simple script to test output files", + "error-codes": [{ + "code": 2, + "description": "File does not exist." + }], + "inputs": [{ + "id": "input_file", + "name": "Input file", + "optional": False, + "type": "File", + "value-key": "[INPUT_FILE]" + }], + "invocation-schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "additionalProperties": False, + "dependencies": {}, + "description": "Invocation schema for output.", + "properties": { + "input_file": { + "type": "string" + } + }, + "required": ["input_file"], + "title": "output.invocationSchema", + "type": "object" + }, + "name": + "output", + "output-files": [{ + "id": + "output_file", + "name": + "Output file", + "path-template": + "./greeting.txt", + "path-template-stripped-extensions": + [".txt", ".mnc", ".cpp", ".m", ".j"], + "value-key": + "[OUTPUT_FILE]" + }], + "schema-version": + "0.5", + "tool-version": + "1.0" +} + +BOUTIQUES_NO_SLEEP_ORIGINAL = { + "command-line": + "echo \"Welcome to CARMIN-Server, $(cat [INPUT_FILE]).\" &> [OUTPUT_FILE]", + "container-image": { + "image": "alpine", + "type": "docker" + }, + "description": + "A simple script to test output files", + "error-codes": [{ + "code": 2, + "description": "File does not exist." + }], + "inputs": [{ + "id": "input_file", + "name": "Input file", + "optional": False, + "type": "File", + "value-key": "[INPUT_FILE]" + }], + "invocation-schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "additionalProperties": False, + "dependencies": {}, + "description": "Invocation schema for output.", + "properties": { + "input_file": { + "type": "string" + } + }, + "required": ["input_file"], + "title": "output.invocationSchema", + "type": "object" + }, + "name": + "output", + "output-files": [{ + "id": + "output_file", + "name": + "Output file", + "path-template": + "./greeting.txt", + "path-template-stripped-extensions": + [".txt", ".mnc", ".cpp", ".m", ".j"], + "value-key": + "[OUTPUT_FILE]" + }], + "schema-version": + "0.5", + "tool-version": + "1.0" +} + +BOUTIQUES_SLEEP_CONVERTED = { + "identifier": + "pipeline1", + "name": + "output", + "version": + "1.0", + "description": + "A simple script to test output files", + "canExecute": + True, + "parameters": [{ + "name": "Input file", + "id": "input_file", + "type": "File", + "isOptional": False, + "isReturnedValue": False + }, { + "name": "Output file", + "id": "output_file", + "type": "File", + "isOptional": False, + "isReturnedValue": True + }], + "properties": { + "boutiques": True + }, + "errorCodesAndMessages": [{ + "errorCode": 2, + "errorMessage": "File does not exist." + }] +} + +BOUTIQUES_NO_SLEEP_CONVERTED = { + "identifier": + "pipeline1", + "name": + "output", + "version": + "1.0", + "description": + "A simple script to test output files", + "canExecute": + True, + "parameters": [{ + "name": "Input file", + "id": "input_file", + "type": "File", + "isOptional": False, + "isReturnedValue": False + }, { + "name": "Output file", + "id": "output_file", + "type": "File", + "isOptional": False, + "isReturnedValue": True + }], + "properties": { + "boutiques": True + }, + "errorCodesAndMessages": [{ + "errorCode": 2, + "errorMessage": "File does not exist." + }] +} diff --git a/server/test/resources/test_execution.py b/server/test/resources/test_execution.py index 773b378..b0d0a94 100644 --- a/server/test/resources/test_execution.py +++ b/server/test/resources/test_execution.py @@ -2,26 +2,54 @@ import copy import pytest +from server import app from server.database.models.execution import Execution as ExecutionDB from server.resources.helpers.executions import get_execution_as_model from server.resources.models.execution import ExecutionSchema, Execution -from server.common.error_codes_and_messages import EXECUTION_NOT_FOUND +from server.common.error_codes_and_messages import EXECUTION_NOT_FOUND, ErrorCodeAndMessageFormatter from server.test.fakedata.executions import ( - POST_VALID_EXECUTION, PATCH_VALID_EXECUTION, PATCH_ILLEGAL_PARAMETER, + post_valid_execution, PATCH_VALID_EXECUTION, PATCH_ILLEGAL_PARAMETER, PATCH_ILLEGAL_PARAMETER2, PATCH_VALID_EXECUTION2, PATCH_VALID_EXECUTION3, PATCH_INVALID_PARAMETER, PATCH_NO_CHANGE_PARAMETER) -from server.test.resources.test_executions import test_config +from server.test.fakedata.pipelines import PipelineStub, BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED from server.test.fakedata.users import standard_user from server.test.utils import load_json_data, error_from_response from server.test.conftest import test_client, session @pytest.fixture -def execution_id(test_client) -> str: +def pipeline(): + return PipelineStub(BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED, + "sleep.json") + + +@pytest.fixture(autouse=True) +def test_config(tmpdir_factory, session, pipeline): + session.add(standard_user(encrypted=True)) + session.commit() + + pipelines_root = tmpdir_factory.mktemp('pipelines') + pipelines_root.join(pipeline.get_converted_filename()).write( + pipeline.get_converted_json()) + boutiques_dir = pipelines_root.mkdir(pipeline.descriptor_type) + boutiques_dir.join(pipeline.get_original_filename()).write( + pipeline.get_original_json()) + app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) + + data_root = tmpdir_factory.mktemp('data') + user_dir = data_root.mkdir(standard_user().username) + user_dir.join('test.txt').write('test file') + user_execution_dir = user_dir.mkdir('executions') + app.config['DATA_DIRECTORY'] = str(data_root) + + +@pytest.fixture +def execution_id(test_client, pipeline) -> str: response = test_client.post( '/executions', headers={"apiKey": standard_user().api_key}, - data=json.dumps(ExecutionSchema().dump(POST_VALID_EXECUTION).data)) + data=json.dumps(ExecutionSchema().dump( + post_valid_execution(pipeline.identifier)).data)) json_response = load_json_data(response) return ExecutionSchema().load(json_response).data.identifier @@ -30,9 +58,7 @@ class TestExecutionResource(): def test_get_execution(self, test_client, execution_id): response = test_client.get( '/executions/{}'.format(execution_id), - headers={ - "apiKey": standard_user().api_key - }) + headers={"apiKey": standard_user().api_key}) json_response = load_json_data(response) execution = ExecutionSchema().load(json_response).data assert isinstance(execution, Execution) @@ -41,13 +67,10 @@ def test_get_invalid_execution(self, test_client): execution_id = "invalid" response = test_client.get( '/executions/{}'.format(execution_id), - headers={ - "apiKey": standard_user().api_key - }) + headers={"apiKey": standard_user().api_key}) error = error_from_response(response) - expected_error_code_and_message = copy.deepcopy(error) - expected_error_code_and_message.error_message = error.error_message.format( - execution_id) + expected_error_code_and_message = ErrorCodeAndMessageFormatter( + EXECUTION_NOT_FOUND, execution_id) assert error == expected_error_code_and_message def test_put_valid_execution_name_and_timeout(self, test_client, session, diff --git a/server/test/resources/test_execution_play.py b/server/test/resources/test_execution_play.py new file mode 100644 index 0000000..21a50d2 --- /dev/null +++ b/server/test/resources/test_execution_play.py @@ -0,0 +1,94 @@ +import json +import copy +import os +import pytest +from server import app +from server.test.fakedata.users import standard_user +from server.test.utils import load_json_data, error_from_response +from server.test.conftest import test_client, session +from server.resources.models.execution import ExecutionSchema +from server.test.fakedata.executions import post_valid_execution +from server.test.fakedata.pipelines import ( + PipelineStub, BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED, + BOUTIQUES_NO_SLEEP_ORIGINAL, BOUTIQUES_NO_SLEEP_CONVERTED) + + +@pytest.fixture +def pipeline_sleep(): + return PipelineStub(BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED, + "sleep.json") + + +@pytest.fixture +def pipeline_no_sleep(): + return PipelineStub(BOUTIQUES_NO_SLEEP_ORIGINAL, + BOUTIQUES_NO_SLEEP_CONVERTED, "no_sleep.json") + + +@pytest.fixture +def post_execution_sleep(test_client, pipeline_sleep): + execution_sleep = post_valid_execution(pipeline_sleep.identifier) + response = test_client.post( + '/executions', + headers={"apiKey": standard_user().api_key}, + data=json.dumps(ExecutionSchema().dump(execution_sleep).data)) + json_response = load_json_data(response) + return ExecutionSchema().load(json_response).data.identifier + + +@pytest.fixture +def post_execution_no_sleep(test_client, pipeline_no_sleep): + execution_no_sleep = post_valid_execution(pipeline_no_sleep.identifier) + response = test_client.post( + '/executions', + headers={"apiKey": standard_user().api_key}, + data=json.dumps(ExecutionSchema().dump(execution_no_sleep).data)) + json_response = load_json_data(response) + return ExecutionSchema().load(json_response).data.identifier + + +@pytest.fixture +def test_config(tmpdir_factory, session, test_client, pipeline_sleep, + pipeline_no_sleep): + session.add(standard_user(encrypted=True)) + session.commit() + + pipelines_root = tmpdir_factory.mktemp('pipelines') + pipelines_root.join(pipeline_sleep.get_converted_filename()).write( + pipeline_sleep.get_converted_json()) + pipelines_root.join(pipeline_no_sleep.get_converted_filename()).write( + pipeline_no_sleep.get_converted_json()) + + boutiques_dir = pipelines_root.mkdir('boutiques') + boutiques_dir.join(pipeline_sleep.get_original_filename()).write( + pipeline_sleep.get_original_json()) + boutiques_dir.join(pipeline_no_sleep.get_original_filename()).write( + pipeline_no_sleep.get_original_json()) + app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) + + data_root = tmpdir_factory.mktemp('data') + user_dir = data_root.mkdir(standard_user().username) + user_dir.join('test.txt').write('Jane Doe') + user_execution_dir = user_dir.mkdir('executions') + app.config['DATA_DIRECTORY'] = str(data_root) + + +class TestExecutionPlayResource(): + def test_put_execution_play_fast(self, test_client, test_config, + post_execution_no_sleep): + response = test_client.get( + '/executions', headers={"apiKey": standard_user().api_key}) + response = test_client.put( + '/executions/{}/play'.format(post_execution_no_sleep), + headers={ + "apiKey": standard_user().api_key, + }) + assert response.status_code == 204 + + output_path = os.path.join(app.config['DATA_DIRECTORY'], + standard_user().username, 'executions', + post_execution_no_sleep, 'greeting.txt') + assert os.path.exists(output_path) + + with open(output_path) as f: + assert f.read() == 'Welcome to CARMIN-Server, Jane Doe.\n' diff --git a/server/test/resources/test_execution_results.py b/server/test/resources/test_execution_results.py new file mode 100644 index 0000000..bdff831 --- /dev/null +++ b/server/test/resources/test_execution_results.py @@ -0,0 +1,118 @@ +import json +import copy +import os +import pytest +from server import app +from server.resources.models.path import PathSchema +from server.common.error_codes_and_messages import ( + ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, + CANNOT_GET_RESULT_NOT_COMPLETED_EXECUTION, UNAUTHORIZED) +from server.database.models.execution import ExecutionStatus +from server.test.fakedata.users import standard_user, standard_user_2 +from server.test.utils import load_json_data, error_from_response +from server.test.conftest import test_client, session +from server.resources.models.execution import ExecutionSchema +from server.test.fakedata.executions import post_valid_execution +from server.test.fakedata.pipelines import ( + PipelineStub, BOUTIQUES_NO_SLEEP_ORIGINAL, BOUTIQUES_NO_SLEEP_CONVERTED) + + +@pytest.fixture +def pipeline_no_sleep(): + return PipelineStub(BOUTIQUES_NO_SLEEP_ORIGINAL, + BOUTIQUES_NO_SLEEP_CONVERTED, "no_sleep.json") + + +@pytest.fixture +def post_execution_no_sleep(test_client, pipeline_no_sleep): + execution_no_sleep = post_valid_execution(pipeline_no_sleep.identifier) + response = test_client.post( + '/executions', + headers={"apiKey": standard_user().api_key}, + data=json.dumps(ExecutionSchema().dump(execution_no_sleep).data)) + json_response = load_json_data(response) + return ExecutionSchema().load(json_response).data.identifier + + +@pytest.fixture +def test_config(tmpdir_factory, session, test_client, pipeline_no_sleep): + session.add(standard_user(encrypted=True)) + session.add(standard_user_2(encrypted=True)) + session.commit() + + pipelines_root = tmpdir_factory.mktemp('pipelines') + pipelines_root.join(pipeline_no_sleep.get_converted_filename()).write( + pipeline_no_sleep.get_converted_json()) + + boutiques_dir = pipelines_root.mkdir('boutiques') + boutiques_dir.join(pipeline_no_sleep.get_original_filename()).write( + pipeline_no_sleep.get_original_json()) + app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) + + data_root = tmpdir_factory.mktemp('data') + user_dir = data_root.mkdir(standard_user().username) + user_dir.join('test.txt').write('Jane Doe') + user_execution_dir = user_dir.mkdir('executions') + app.config['DATA_DIRECTORY'] = str(data_root) + + +@pytest.fixture +def play_execution_no_sleep(test_client, post_execution_no_sleep): + response = test_client.get( + '/executions', headers={"apiKey": standard_user().api_key}) + response = test_client.put( + '/executions/{}/play'.format(post_execution_no_sleep), + headers={ + "apiKey": standard_user().api_key, + }) + assert response.status_code == 204 + + +class TestExecutionResults(): + def test_get_results_invalid_execution_id(self, test_client, test_config, + post_execution_no_sleep): + invalid_id = "NOT_{}".format(post_execution_no_sleep) + response = test_client.get( + '/executions/{}/results'.format(invalid_id), + headers={"apiKey": standard_user().api_key}) + error = error_from_response(response) + expected_error_code_and_message = ErrorCodeAndMessageFormatter( + EXECUTION_NOT_FOUND, invalid_id) + assert error == expected_error_code_and_message + + def test_get_results_not_completed_execution( + self, test_client, test_config, post_execution_no_sleep): + response = test_client.get( + '/executions/{}/results'.format(post_execution_no_sleep), + headers={"apiKey": standard_user().api_key}) + error = error_from_response(response) + expected_error_code_and_message = ErrorCodeAndMessageFormatter( + CANNOT_GET_RESULT_NOT_COMPLETED_EXECUTION, + ExecutionStatus.Initializing.name) + assert error == expected_error_code_and_message + + def test_get_results_user_not_owner(self, test_client, test_config, + post_execution_no_sleep): + response = test_client.get( + '/executions/{}/results'.format(post_execution_no_sleep), + headers={"apiKey": standard_user_2().api_key}) + error = error_from_response(response) + assert error == UNAUTHORIZED + + def test_get_results_success(self, test_client, test_config, + post_execution_no_sleep, + play_execution_no_sleep): + response = test_client.get( + '/executions/{}/results'.format(post_execution_no_sleep), + headers={"apiKey": standard_user().api_key}) + assert response.status_code == 200 + paths = PathSchema(many=True).load(load_json_data(response)).data + assert len(paths) == 1 + output_file_path = os.path.relpath( + os.path.join(app.config['DATA_DIRECTORY'], + standard_user().username, "executions", + post_execution_no_sleep, BOUTIQUES_NO_SLEEP_ORIGINAL[ + "output-files"][0]["path-template"]), + app.config['DATA_DIRECTORY']) + relative_returned_path = paths[0].platform_path.split("/path/", 1)[1] + assert relative_returned_path == output_file_path diff --git a/server/test/resources/test_execution_stderr.py b/server/test/resources/test_execution_stderr.py new file mode 100644 index 0000000..43ed7f5 --- /dev/null +++ b/server/test/resources/test_execution_stderr.py @@ -0,0 +1,88 @@ +import pytest +import os +import json +from server.test.utils import load_json_data, error_from_response +from server.test.conftest import test_client, session +from server.test.fakedata.users import standard_user +from server.test.fakedata.executions import post_valid_execution +from server.test.fakedata.pipelines import PipelineStub, BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED +from server import app +from server.config import TestConfig +from server.resources.models.execution import ExecutionSchema +from server.resources.helpers.executions import get_execution_carmin_files_dir +from server.common.error_codes_and_messages import ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, PATH_DOES_NOT_EXIST + + +@pytest.fixture +def pipeline(): + return PipelineStub(BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED, + "sleep.json") + + +@pytest.fixture(autouse=True) +def test_config(tmpdir_factory, session, pipeline): + session.add(standard_user(encrypted=True)) + session.commit() + + pipelines_root = tmpdir_factory.mktemp('pipelines') + pipelines_root.join(pipeline.get_converted_filename()).write( + pipeline.get_converted_json()) + boutiques_dir = pipelines_root.mkdir(pipeline.descriptor_type) + boutiques_dir.join(pipeline.get_original_filename()).write( + pipeline.get_original_json()) + app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) + + data_root = tmpdir_factory.mktemp('data') + user_dir = data_root.mkdir(standard_user().username) + user_dir.join('test.txt').write('test file') + user_execution_dir = user_dir.mkdir('executions') + app.config['DATA_DIRECTORY'] = str(data_root) + + +@pytest.fixture +def execution_id(test_client, pipeline) -> str: + response = test_client.post( + '/executions', + headers={"apiKey": standard_user().api_key}, + data=json.dumps(ExecutionSchema().dump( + post_valid_execution(pipeline.identifier)).data)) + json_response = load_json_data(response) + return ExecutionSchema().load(json_response).data.identifier + + +@pytest.fixture +def write_std_err(execution_id) -> str: + carmin_dir = get_execution_carmin_files_dir(standard_user().username, + execution_id) + simple_stdout_text = "This is stderr content" + with open(os.path.join(carmin_dir, "stderr.txt"), "w") as f: + f.write(simple_stdout_text) + + return simple_stdout_text + + +class TestExecutionStdErrResource(): + def test_get_execution_std_err_by_identifier(self, test_client, + execution_id, write_std_err): + response = test_client.get( + '/executions/{}/stderr'.format(execution_id), + headers={"apiKey": standard_user().api_key}) + assert response.data.decode('utf8') == write_std_err + + def test_get_execution_std_err_not_found(self, test_client, execution_id): + response = test_client.get( + '/executions/{}/stderr'.format(execution_id), + headers={"apiKey": standard_user().api_key}) + error = error_from_response(response) + assert error == PATH_DOES_NOT_EXIST + + def test_get_execution_std_err_invalid_execution_id( + self, test_client, execution_id, write_std_err): + invalid_execution_id = "NOT_{}".format(execution_id) + response = test_client.get( + '/executions/{}/stderr'.format(invalid_execution_id), + headers={"apiKey": standard_user().api_key}) + error = error_from_response(response) + expected_error_code_and_message = ErrorCodeAndMessageFormatter( + EXECUTION_NOT_FOUND, invalid_execution_id) + assert error == expected_error_code_and_message diff --git a/server/test/resources/test_execution_stdout.py b/server/test/resources/test_execution_stdout.py index 55c3a80..5dbfbf4 100644 --- a/server/test/resources/test_execution_stdout.py +++ b/server/test/resources/test_execution_stdout.py @@ -1,52 +1,88 @@ import pytest -import copy import os import json from server.test.utils import load_json_data, error_from_response from server.test.conftest import test_client, session from server.test.fakedata.users import standard_user +from server.test.fakedata.executions import post_valid_execution +from server.test.fakedata.pipelines import PipelineStub, BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED from server import app from server.config import TestConfig -from server.resources.models.error_code_and_message import ErrorCodeAndMessageSchema -from server.common.error_codes_and_messages import INVALID_PIPELINE_IDENTIFIER -from server.resources.models.pipeline import PipelineSchema -from server.test.fakedata.executions import execution_for_db +from server.resources.models.execution import ExecutionSchema +from server.resources.helpers.executions import get_execution_carmin_files_dir +from server.common.error_codes_and_messages import ErrorCodeAndMessageFormatter, EXECUTION_NOT_FOUND, PATH_DOES_NOT_EXIST -EXECUTION_IDENTIFIER = "test-execution-identifier" + +@pytest.fixture +def pipeline(): + return PipelineStub(BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED, + "sleep.json") @pytest.fixture(autouse=True) -def user_execution_folder(tmpdir_factory): - root_directory = tmpdir_factory.mktemp('data') - subdir = root_directory.mkdir(standard_user().username) - user_executions = subdir.mkdir('executions') - execution_folder = user_executions.mkdir(EXECUTION_IDENTIFIER) - app.config['DATA_DIRECTORY'] = str(root_directory) +def test_config(tmpdir_factory, session, pipeline): + session.add(standard_user(encrypted=True)) + session.commit() - return execution_folder + pipelines_root = tmpdir_factory.mktemp('pipelines') + pipelines_root.join(pipeline.get_converted_filename()).write( + pipeline.get_converted_json()) + boutiques_dir = pipelines_root.mkdir(pipeline.descriptor_type) + boutiques_dir.join(pipeline.get_original_filename()).write( + pipeline.get_original_json()) + app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) + data_root = tmpdir_factory.mktemp('data') + user_dir = data_root.mkdir(standard_user().username) + user_dir.join('test.txt').write('test file') + user_execution_dir = user_dir.mkdir('executions') + app.config['DATA_DIRECTORY'] = str(data_root) -@pytest.fixture(autouse=True) -def data_creation(session): - user = standard_user(encrypted=True) - session.add(user) - session.commit() - execution = execution_for_db(EXECUTION_IDENTIFIER, user.username) - session.add(execution) - session.commit() +@pytest.fixture +def execution_id(test_client, pipeline) -> str: + response = test_client.post( + '/executions', + headers={"apiKey": standard_user().api_key}, + data=json.dumps(ExecutionSchema().dump( + post_valid_execution(pipeline.identifier)).data)) + json_response = load_json_data(response) + return ExecutionSchema().load(json_response).data.identifier + + +@pytest.fixture +def write_std_out(execution_id) -> str: + carmin_dir = get_execution_carmin_files_dir(standard_user().username, + execution_id) + simple_stdout_text = "This is stdout content" + with open(os.path.join(carmin_dir, "stdout.txt"), "w") as f: + f.write(simple_stdout_text) + + return simple_stdout_text class TestExecutionStdOutResource(): def test_get_execution_std_out_by_identifier(self, test_client, - user_execution_folder): - pass - # simple_stdout_text = "This is stdout content" - # user_execution_folder.join("stdout.txt").write(simple_stdout_text) - - # response = test_client.get( - # '/executions/{}/stdout'.format(EXECUTION_IDENTIFIER), - # headers={ - # "apiKey": standard_user().api_key - # }) - # assert response.data.decode('utf8') == simple_stdout_text + execution_id, write_std_out): + response = test_client.get( + '/executions/{}/stdout'.format(execution_id), + headers={"apiKey": standard_user().api_key}) + assert response.data.decode('utf8') == write_std_out + + def test_get_execution_std_out_not_found(self, test_client, execution_id): + response = test_client.get( + '/executions/{}/stdout'.format(execution_id), + headers={"apiKey": standard_user().api_key}) + error = error_from_response(response) + assert error == PATH_DOES_NOT_EXIST + + def test_get_execution_std_out_invalid_execution_id( + self, test_client, execution_id, write_std_out): + invalid_execution_id = "NOT_{}".format(execution_id) + response = test_client.get( + '/executions/{}/stdout'.format(invalid_execution_id), + headers={"apiKey": standard_user().api_key}) + error = error_from_response(response) + expected_error_code_and_message = ErrorCodeAndMessageFormatter( + EXECUTION_NOT_FOUND, invalid_execution_id) + assert error == expected_error_code_and_message diff --git a/server/test/resources/test_executions.py b/server/test/resources/test_executions.py index 8b9936b..05897ea 100644 --- a/server/test/resources/test_executions.py +++ b/server/test/resources/test_executions.py @@ -1,5 +1,9 @@ import copy import os +try: + from os import scandir, walk +except ImportError: + from scandir import scandir, walk import json import pytest from server import app @@ -8,57 +12,78 @@ INVALID_MODEL_PROVIDED, INVALID_INPUT_FILE, INVALID_QUERY_PARAMETER) from server.resources.models.pipeline import PipelineSchema from server.resources.models.execution import ExecutionSchema -from server.test.fakedata.pipelines import PIPELINE_FOUR +from server.test.fakedata.pipelines import PipelineStub, BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED from server.test.fakedata.executions import ( - POST_VALID_EXECUTION, POST_INVALID_EXECUTION_FILE_NOT_EXIST, - POST_INVALID_EXECUTION_ARRAY_FILE_NOT_EXIST, POST_INVALID_IDENTIFIER_SET, + post_valid_execution, post_invalid_execution_file_not_exist, + post_invalid_execution_array_file_not_exist, post_invalid_identifier_set, POST_INVALID_EXECUTION_IDENTIFIER_NOT_EXIST, POST_INVALID_MODEL) from server.test.fakedata.users import standard_user from server.test.utils import load_json_data, error_from_response from server.test.conftest import test_client, session +from server.resources.helpers.executions import INPUTS_FILENAME, DESCRIPTOR_FILENAME + + +@pytest.fixture +def pipeline(): + return PipelineStub(BOUTIQUES_SLEEP_ORIGINAL, BOUTIQUES_SLEEP_CONVERTED, + "sleep.json") @pytest.fixture(autouse=True) -def test_config(tmpdir_factory, session): +def test_config(tmpdir_factory, session, pipeline): session.add(standard_user(encrypted=True)) session.commit() pipelines_root = tmpdir_factory.mktemp('pipelines') + pipelines_root.join(pipeline.get_converted_filename()).write( + pipeline.get_converted_json()) + boutiques_dir = pipelines_root.mkdir(pipeline.descriptor_type) + boutiques_dir.join(pipeline.get_original_filename()).write( + pipeline.get_original_json()) + app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) + data_root = tmpdir_factory.mktemp('data') - pipelines_root.join('pipeline1.json').write( - json.dumps(PipelineSchema().dump(PIPELINE_FOUR).data)) user_dir = data_root.mkdir(standard_user().username) user_dir.join('test.txt').write('test file') user_execution_dir = user_dir.mkdir('executions') app.config['DATA_DIRECTORY'] = str(data_root) - app.config['PIPELINE_DIRECTORY'] = str(pipelines_root) @pytest.fixture -def number_of_executions(test_client) -> int: +def number_of_executions(test_client, pipeline) -> int: number_of_executions = 10 for _ in range(number_of_executions): test_client.post( '/executions', headers={"apiKey": standard_user().api_key}, - data=json.dumps(ExecutionSchema().dump(POST_VALID_EXECUTION).data)) + data=json.dumps(ExecutionSchema().dump( + post_valid_execution(pipeline.identifier)).data)) return number_of_executions class TestExecutionsResource(): # tests for POST - def test_post_valid_execution(self, test_client): + def test_post_valid_execution(self, test_client, pipeline): user_execution_dir = os.path.join(app.config['DATA_DIRECTORY'], standard_user().username, 'executions') response = test_client.post( '/executions', headers={"apiKey": standard_user().api_key}, - data=json.dumps(ExecutionSchema().dump(POST_VALID_EXECUTION).data)) - assert os.listdir(user_execution_dir) + data=json.dumps(ExecutionSchema().dump( + post_valid_execution(pipeline.identifier)).data)) assert response.status_code == 200 - def test_post_file_doesnt_exist(self, test_client): + json_response = load_json_data(response) + execution = ExecutionSchema().load(json_response).data + execution_dir = os.path.join(user_execution_dir, execution.identifier) + carmin_files_dir = os.path.join(execution_dir, ".carmin-files") + assert os.path.isdir(execution_dir) + assert os.path.isdir(carmin_files_dir) + assert INPUTS_FILENAME in os.listdir(carmin_files_dir) + assert DESCRIPTOR_FILENAME in os.listdir(carmin_files_dir) + + def test_post_file_doesnt_exist(self, test_client, pipeline): user_execution_dir = os.path.join(app.config['DATA_DIRECTORY'], standard_user().username, 'executions') @@ -66,34 +91,35 @@ def test_post_file_doesnt_exist(self, test_client): '/executions', headers={"apiKey": standard_user().api_key}, data=json.dumps(ExecutionSchema().dump( - POST_INVALID_EXECUTION_FILE_NOT_EXIST).data)) + post_invalid_execution_file_not_exist( + pipeline.identifier)).data)) assert not os.listdir(user_execution_dir) assert response.status_code == 400 - def test_post_array_file_doesnt_exist(self, test_client): + def test_post_array_file_doesnt_exist(self, test_client, pipeline): user_execution_dir = os.path.join(app.config['DATA_DIRECTORY'], standard_user().username, 'executions') + execution = post_invalid_execution_array_file_not_exist( + pipeline.identifier) response = test_client.post( '/executions', headers={"apiKey": standard_user().api_key}, - data=json.dumps(ExecutionSchema().dump( - POST_INVALID_EXECUTION_ARRAY_FILE_NOT_EXIST).data)) + data=json.dumps(ExecutionSchema().dump(execution).data)) error_code_and_message = error_from_response(response) expected_error_code_and_message = copy.deepcopy(INVALID_INPUT_FILE) expected_error_code_and_message.error_message = expected_error_code_and_message.error_message.format( - *POST_INVALID_EXECUTION_ARRAY_FILE_NOT_EXIST. - input_values["file_input"]) + *execution.input_values["input_file"]) assert not os.listdir(user_execution_dir) assert error_code_and_message == expected_error_code_and_message - def test_post_identifier_set(self, test_client): + def test_post_identifier_set(self, test_client, pipeline): response = test_client.post( '/executions', headers={"apiKey": standard_user().api_key}, - data=json.dumps( - ExecutionSchema().dump(POST_INVALID_IDENTIFIER_SET).data)) + data=json.dumps(ExecutionSchema().dump( + post_invalid_identifier_set(pipeline.identifier)).data)) error = error_from_response(response) assert error == EXECUTION_IDENTIFIER_MUST_NOT_BE_SET @@ -150,13 +176,8 @@ def test_get_with_invalid_offset(self, test_client, number_of_executions): headers={ "apiKey": standard_user().api_key }) - error = error_from_response(response) - - expected_error_code_and_message = copy.deepcopy( - INVALID_QUERY_PARAMETER) - expected_error_code_and_message.error_message = expected_error_code_and_message.error_message.format( - offset, 'offset') - assert error == expected_error_code_and_message + json_response = load_json_data(response) + assert len(json_response) == number_of_executions def test_get_with_offset_greater_than_execution_count( self, test_client, number_of_executions): @@ -176,13 +197,8 @@ def test_get_with_negative_offset(self, test_client, number_of_executions): headers={ "apiKey": standard_user().api_key }) - error = error_from_response(response) - - expected_error_code_and_message = copy.deepcopy( - INVALID_QUERY_PARAMETER) - expected_error_code_and_message.error_message = expected_error_code_and_message.error_message.format( - offset, 'offset') - assert error == expected_error_code_and_message + json_response = load_json_data(response) + assert len(json_response) == number_of_executions def test_get_with_limit(self, test_client, number_of_executions): limit = 4 @@ -201,13 +217,8 @@ def test_get_with_invalid_limit(self, test_client, number_of_executions): headers={ "apiKey": standard_user().api_key }) - error = error_from_response(response) - - expected_error_code_and_message = copy.deepcopy( - INVALID_QUERY_PARAMETER) - expected_error_code_and_message.error_message = expected_error_code_and_message.error_message.format( - limit, 'limit') - assert error == expected_error_code_and_message + json_response = load_json_data(response) + assert len(json_response) == number_of_executions def test_get_with_negative_limit(self, test_client, number_of_executions): limit = -10 @@ -216,11 +227,5 @@ def test_get_with_negative_limit(self, test_client, number_of_executions): headers={ "apiKey": standard_user().api_key }) - - error = error_from_response(response) - - expected_error_code_and_message = copy.deepcopy( - INVALID_QUERY_PARAMETER) - expected_error_code_and_message.error_message = expected_error_code_and_message.error_message.format( - limit, 'limit') - assert error == expected_error_code_and_message + json_response = load_json_data(response) + assert len(json_response) == number_of_executions diff --git a/server/test/resources/test_executions_count.py b/server/test/resources/test_executions_count.py index 6c0ea55..43478e0 100644 --- a/server/test/resources/test_executions_count.py +++ b/server/test/resources/test_executions_count.py @@ -1,4 +1,4 @@ -from server.test.resources.test_executions import test_config, number_of_executions +from server.test.resources.test_executions import test_config, number_of_executions, pipeline from server.test.fakedata.users import standard_user from server.test.utils import load_json_data from server.test.conftest import test_client, session @@ -7,16 +7,12 @@ class TestExecutionsCountResource(): def test_get_0_executions(self, test_client): response = test_client.get( - '/executions/count', headers={ - "apiKey": standard_user().api_key - }) + '/executions/count', headers={"apiKey": standard_user().api_key}) json_response = load_json_data(response) assert json_response == 0 def test_get_10_executions(self, test_client, number_of_executions): response = test_client.get( - '/executions/count', headers={ - "apiKey": standard_user().api_key - }) + '/executions/count', headers={"apiKey": standard_user().api_key}) json_response = load_json_data(response) assert json_response == number_of_executions diff --git a/server/test/resources/test_pipeline_boutiquesdescriptor.py b/server/test/resources/test_pipeline_boutiquesdescriptor.py index 1c01071..a645f0d 100644 --- a/server/test/resources/test_pipeline_boutiquesdescriptor.py +++ b/server/test/resources/test_pipeline_boutiquesdescriptor.py @@ -28,7 +28,7 @@ def data_tester(tmpdir_factory): @pytest.fixture(autouse=True) -def user_creater(session): +def user_creator(session): session.add(standard_user(encrypted=True)) session.commit() @@ -38,9 +38,7 @@ def test_get_pipeline_boutiques_descriptor_by_identifier( self, test_client): response = test_client.get( '/pipelines/{}/boutiquesdescriptor'.format(PipelineOne.identifier), - headers={ - "apiKey": standard_user().api_key - }) + headers={"apiKey": standard_user().api_key}) pipeline = load_json_data(response) original_pipeline = json.loads( PipelineSchema().dumps(PipelineOne).data) @@ -52,8 +50,6 @@ def test_get_pipeline_boutiques_descriptor_by_identifier_invalid_id( response = test_client.get( '/pipelines/{}/boutiquesdescriptor'.format("INVALID_{}".format( PipelineOne.identifier)), - headers={ - "apiKey": standard_user().api_key - }) + headers={"apiKey": standard_user().api_key}) error = error_from_response(response) assert error == INVALID_PIPELINE_IDENTIFIER diff --git a/server/test/resources/test_platform.py b/server/test/resources/test_platform.py index 0925102..eed8425 100644 --- a/server/test/resources/test_platform.py +++ b/server/test/resources/test_platform.py @@ -45,12 +45,6 @@ def test_missing_required_parameter(config_data): properties_validation(config_data) -def test_invalid_protocol(config_data): - config_data['supportedTransferProtocols'].append("invalidProtocol") - with pytest.raises(ValueError): - properties_validation(config_data) - - def test_max_authorized_execution_timeout_greater_than_min(config_data): config_data['minAuthorizedExecutionTimeout'] = 1024 config_data['maxAuthorizedExecutionTimeout'] = 64 diff --git a/setup.py b/setup.py index 22ba77f..9123bcf 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,8 @@ "flask-restful>=0.3.6,<1.0", "flask-sqlalchemy>-2.3.2,<3.0", "psycopg2-binary>=2.7.4,<3.0", "marshmallow>=2.15.0,<3.0", "marshmallow_enum>=1.4.1,<2.0", "boutiques>=0.5.6,<1.0", - "blinker>=1.4,<2.0", "typing>=3.6.4,<4.0", "scandir>=1.7,<2.0" + "blinker>=1.4,<2.0", "typing>=3.6.4,<4.0", "scandir>=1.7,<2.0", + "psutil>=5.4.5,<6.0" ] setup( From 993d1b332ac29d79aa0dab341661bb783eaf39c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-Olivier=20Gu=C3=A9rin?= Date: Mon, 30 Apr 2018 19:22:03 -0400 Subject: [PATCH 5/6] Path upload raw (#20) * Solving async process remaining alive * Add descriptor factory and ABC * Execution successful * Implement boutiques concrete class * Use abstract factory to export pipelines * Return False on error * Timeout management and errors on insert * Use abstract factory for execute * Pipeline timeout and DB better management * Preventive commit * Changes for the demo * Remove print execution kill and add comment to execution post * Get original descriptor also return the type with the path * Fix rename and Added descriptor to DB Execution model * Missing comma * Fixing pipeline test setup * Fix tuple None * Descriptor copied to execution folder * Test modified. Taking into account the Descriptor copy and inputs file * Execution using local descriptor and Abstract descriptor class * Should work entirely for the purge * Changed purged execution status to Unknown * Added forgotten import * Create directories for all supported descriptors * Begin get results and move all carmin files to execution subfolder * Move carmin related files to a subfolder of the execution * Make carmin-files directory hidden * Safety push (VM is getting slow) * Make pipeline identifier path to pipeline * Added execution id to Path * Write absolute paths file to .carmin-files * Added "Corrupted" error message for execution * Removing error line * Two quick fixes * Removed useless pass, added execution delete and more comments * Fix process killing with docker * Make call to sh kill instead of using python kill * Add upload raw data * Add some error handling * Add test for raw file upload * Add comments --- README.md | 8 +--- server/resources/helpers/path.py | 4 ++ server/resources/path.py | 60 ++++++++++++++++++++---------- server/test/resources/test_path.py | 39 ++++++++++++++++--- 4 files changed, 79 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 3377200..d6cb0e8 100644 --- a/README.md +++ b/README.md @@ -161,12 +161,8 @@ Let's add some data to the server with the `PUT /path/{completePath}` method: ```bash curl -X "PUT" "http://localhost:8080/path/admin/new_user.txt" \ -H 'apiKey: [secret-api-key]' \ - -d $'{ - "type": "File", - "base64Content": "bmV3IENBUk1JTiB1c2VyCg==" -}' + -d "Jane Doe" ``` - The server should reply with a `201: Created` code, indicating that the resource was successfully uploaded to the server. @@ -186,7 +182,7 @@ The server should return a `Path` object, which describes the resource that we u "platformPath": "http://localhost:8080/path/admin/new_user.txt", "lastModificationDate": 1521740108, "isDirectory": false, - "size": 12, + "size": 8, "mimeType": "text/plain" } ``` diff --git a/server/resources/helpers/path.py b/server/resources/helpers/path.py index ee83798..fc2d01f 100644 --- a/server/resources/helpers/path.py +++ b/server/resources/helpers/path.py @@ -131,6 +131,10 @@ def upload_archive(upload_data: UploadData, with zipfile.ZipFile(file_name, mode='r') as zf: zf.extractall(path=requested_dir_path) except zipfile.BadZipFile as e: + try: + os.remove(file_name) + except: + pass return None, ErrorCodeAndMessageFormatter(NOT_AN_ARCHIVE, e) os.remove(file_name) path = Path.object_from_pathname(requested_dir_path) diff --git a/server/resources/path.py b/server/resources/path.py index 63f4c8e..e4d731a 100644 --- a/server/resources/path.py +++ b/server/resources/path.py @@ -5,7 +5,8 @@ from flask import Response, make_response from server.common.utils import marshal from server.common.error_codes_and_messages import ( - ErrorCodeAndMessageFormatter, UNAUTHORIZED, INVALID_PATH, INVALID_ACTION, + ErrorCodeAndMessageFormatter, ErrorCodeAndMessageAdditionalDetails, + INVALID_MODEL_PROVIDED, UNAUTHORIZED, INVALID_PATH, INVALID_ACTION, MD5_ON_DIR, LIST_ACTION_ON_FILE, ACTION_REQUIRED, UNEXPECTED_ERROR, PATH_IS_DIRECTORY, INVALID_REQUEST, PATH_DOES_NOT_EXIST) from .models.upload_data import UploadDataSchema @@ -68,14 +69,49 @@ def get(self, user, complete_path: str = ''): return marshal(INVALID_ACTION), 400 @login_required - @unmarshal_request(UploadDataSchema(), allow_none=True) - def put(self, user, model, complete_path: str = ''): + def put(self, user, complete_path: str = ''): + data = request.data requested_data_path = make_absolute(complete_path) if not is_safe_for_put(requested_data_path, user): return marshal(INVALID_PATH), 401 - if not model: + if request.headers.get( + 'Content-Type', + default='').lower() == 'application/carmin+json' and data: + # Request data contains base64 encoding of file or archive + data = request.get_json(force=True, silent=True) + model, error = UploadDataSchema().load(data) + if error: + return marshal( + ErrorCodeAndMessageAdditionalDetails( + INVALID_MODEL_PROVIDED, error)), 400 + if model.upload_type == "File": + if os.path.isdir(requested_data_path): + error = ErrorCodeAndMessageFormatter( + PATH_IS_DIRECTORY, complete_path) + return marshal(error), 400 + path, error = upload_file(model, requested_data_path) + if error: + return marshal(error), 400 + return marshal(path), 201 + + if model.upload_type == "Archive": + path, error = upload_archive(model, requested_data_path) + if error: + return marshal(error), 400 + return marshal(path), 201 + if data: + # Content-Type is not 'application/carmin+json', + # request data is taken as raw text + try: + with open(requested_data_path, 'w') as f: + f.write(data.decode('utf-8', errors='ignore')) + return marshal( + PathModel.object_from_pathname(requested_data_path)), 201 + except OSError: + return marshal(INVALID_PATH), 400 + if not data: path, error = create_directory(requested_data_path) if error: return marshal(error), 400 @@ -83,22 +119,6 @@ def put(self, user, model, complete_path: str = ''): string_path = json.dumps(PathSchema().dump(path).data) return make_response((string_path, 201, file_location_header)) - if model.upload_type == "File": - if os.path.isdir(requested_data_path): - error = ErrorCodeAndMessageFormatter(PATH_IS_DIRECTORY, - complete_path) - return marshal(error), 400 - path, error = upload_file(model, requested_data_path) - if error: - return marshal(error), 400 - return marshal(path), 201 - - if model.upload_type == "Archive": - path, error = upload_archive(model, requested_data_path) - if error: - return marshal(error), 400 - return marshal(path), 201 - return marshal(INVALID_REQUEST), 400 @login_required diff --git a/server/test/resources/test_path.py b/server/test/resources/test_path.py index f442655..856447c 100644 --- a/server/test/resources/test_path.py +++ b/server/test/resources/test_path.py @@ -245,7 +245,10 @@ def test_put_outside_authorized_directory(self, test_client): def test_put_with_invalid_upload_type(self, test_client): response = test_client.put( '/path/{}/new_file.txt'.format(standard_user().username), - headers={"apiKey": standard_user().api_key}, + headers={ + "apiKey": standard_user().api_key, + "Content-Type": "application/carmin+json" + }, data='{"type": "Invented", "base64Content": "ewlfkjweflk=="}') error = error_from_response(response) assert error.error_code == INVALID_MODEL_PROVIDED.error_code @@ -253,7 +256,6 @@ def test_put_with_invalid_upload_type(self, test_client): assert len(error.error_detail) == 1 assert "type" in error.error_detail - def test_put_where_parent_dir_not_exist(self, test_client): response = test_client.put( '/path/{}/made_up_dir/file.txt'.format(standard_user().username), @@ -288,7 +290,10 @@ def test_put_base64_file(self, test_client, put_file): file_name = '{}/put_file.txt'.format(standard_user().username) response = test_client.put( '/path/{}'.format(file_name), - headers={"apiKey": standard_user().api_key}, + headers={ + "apiKey": standard_user().api_key, + "Content-Type": "application/carmin+json" + }, data=json.dumps(UploadDataSchema().dump(put_file).data)) assert os.path.exists( os.path.join(app.config['DATA_DIRECTORY'], file_name)) @@ -298,7 +303,10 @@ def test_put_base64_dir(self, test_client, put_dir): abs_path = os.path.join(app.config['DATA_DIRECTORY'], path) response = test_client.put( '/path/{}'.format(path), - headers={"apiKey": standard_user().api_key}, + headers={ + "apiKey": standard_user().api_key, + "Content-Type": "application/carmin+json" + }, data=json.dumps(UploadDataSchema().dump(put_dir).data)) assert response.status_code == 201 and os.listdir(abs_path) @@ -309,7 +317,10 @@ def test_put_base64_invalid_dir(self, test_client): base64_content='bad_content', upload_type='Archive', md5='') response = test_client.put( '/path/{}'.format(path), - headers={"apiKey": standard_user().api_key}, + headers={ + "apiKey": standard_user().api_key, + "Content-Type": "application/carmin+json" + }, data=json.dumps(UploadDataSchema().dump(put_dir).data)) assert response.status_code == 400 @@ -319,12 +330,28 @@ def test_put_file_on_dir(self, test_client): base64_content='bad_content', upload_type='File', md5='') response = test_client.put( '/path/{}'.format(path), - headers={"apiKey": standard_user().api_key}, + headers={ + "apiKey": standard_user().api_key, + "Content-Type": "application/carmin+json" + }, data=json.dumps(UploadDataSchema().dump(put_dir).data)) error = error_from_response(response) assert error.error_message == "Invalid path: '{}' is a directory.".format( path) + def test_put_file_raw(self, test_client): + path = '{}/test_file_raw.txt'.format(standard_user().username) + file_content = "This is a test raw file" + response = test_client.put( + '/path/{}'.format(path), + headers={"apiKey": standard_user().api_key}, + data=file_content) + assert response.status_code == 201 + + file_path = os.path.join(app.config['DATA_DIRECTORY'], path) + with open(file_path) as f: + assert f.read() == file_content + # tests for DELETE def test_delete_single_file(self, test_client): file_to_delete = "{}/file.json".format(standard_user().username) From a2e94ba2dc759f67d8e34b7b6ada8d7c3b0537db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-Olivier=20Gu=C3=A9rin?= Date: Mon, 30 Apr 2018 21:23:45 -0400 Subject: [PATCH 6/6] Add CLI (#22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * First draft of CLI * Add config file * Fix error * Fix another error * Catch file doesnt exist error * Catch other error * Handle keyboard interrupts * Change wording * Add shebang to entrypoint * Fix typo * Add —container flag * Add port option to cli * Rename install to setup --- .gitignore | 1 + VERSION | 1 + cli/carmin_server.py | 51 ++++++++++++++++++++++++++++++ cli/carmin_server_run.py | 51 ++++++++++++++++++++++++++++++ cli/carmin_server_setup.py | 63 ++++++++++++++++++++++++++++++++++++++ cli/cli_helper.py | 5 +++ server/__init__.py | 12 +++++++- setup.py | 14 +++++++-- 8 files changed, 194 insertions(+), 4 deletions(-) create mode 100644 VERSION create mode 100755 cli/carmin_server.py create mode 100644 cli/carmin_server_run.py create mode 100644 cli/carmin_server_setup.py create mode 100644 cli/cli_helper.py diff --git a/.gitignore b/.gitignore index 9069324..cb166dc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .pytest_cache/* *.db /carmin-server/.coverage +/CONFIG.json diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..49d5957 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.1 diff --git a/cli/carmin_server.py b/cli/carmin_server.py new file mode 100755 index 0000000..8cc5106 --- /dev/null +++ b/cli/carmin_server.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""CARMIN-Server + +A lightweight server for the execution of remote pipelines. + +Usage: + carmin-server [--help] [--version] COMMAND [OPTIONS...] + +Options: + -h, --help Print help page and quit + -v, --version Print version information and quit + +Commands: + setup Install and configure the server + run Launch the server + """ + +from subprocess import call +from pathlib import Path +from docopt import docopt +from cli_helper import project_root + + +def get_version(): + root_path = project_root() + version_file = open(Path(root_path, 'VERSION')) + return version_file.read().strip() + + +if __name__ == '__main__': + args = docopt(__doc__, options_first=True, version=get_version()) + + argv = [args['COMMAND']] + args['OPTIONS'] + + if args['COMMAND'] == 'setup': + import carmin_server_setup + try: + exit(call(['python3', 'carmin_server_setup.py'] + argv)) + except KeyboardInterrupt: + exit() + elif args['COMMAND'] == 'run': + import carmin_server_run + try: + exit(call(['python3', 'carmin_server_run.py'] + argv)) + except KeyboardInterrupt: + pass + elif args['COMMAND'] in ['help', None]: + exit(call(['python3', 'carmin_server.py', '--help'])) + else: + exit("{} is not a carmin-server command. See 'carmin-server --help.'". + format(args['COMMAND'])) diff --git a/cli/carmin_server_run.py b/cli/carmin_server_run.py new file mode 100644 index 0000000..dc487be --- /dev/null +++ b/cli/carmin_server_run.py @@ -0,0 +1,51 @@ +"""Usage: carmin-server run [options] + +Launches the server + +Options: + -p , --port The server will listen on this port + -c, --container Launch the server inside a Docker container + """ + +import json +from subprocess import call +from pathlib import Path +from docopt import docopt + +from cli_helper import project_root + + +def config_dict(): + root_dir = Path(__file__).resolve().parent.parent + config_file = Path(root_dir, 'CONFIG.json') + try: + with open(config_file) as f: + return json.load(f) + except FileNotFoundError: + return {} + except TypeError: + return {} + + +CONFIG = config_dict() + +if __name__ == '__main__': + args = docopt(__doc__) + port = args.get('--port') or '8080' + try: + port = int(port) + except ValueError: + print("Invalid port number. Port must be an integer.") + exit(1) + if args.get('--container'): + call(['docker', 'build', '-t=carmin-server', '..']) + call([ + 'docker', 'run', '-p', '{}:8080'.format(port), '-e', + 'DATABASE_URI="sqlite:////carmin-db/app.db"', '-v', + '{}:/carmin-assets/pipelines'.format( + CONFIG.get('PIPELINE_DIRECTORY')), + '-v', '{}:/carmin-assets/data'.format( + CONFIG.get('DATA_DIRECTORY')), 'carmin-server' + ]) + else: + call(['python3', '-m', 'server', str(port)], cwd=project_root()) diff --git a/cli/carmin_server_setup.py b/cli/carmin_server_setup.py new file mode 100644 index 0000000..a700bf8 --- /dev/null +++ b/cli/carmin_server_setup.py @@ -0,0 +1,63 @@ +"""Usage: carmin-server setup [options] + +Options: + -p , --pipeline-directory Specify path for pipeline directory + -d , --data-directory Specify path for data directory + -w , --database Specify path for database + +""" + +import json +from pathlib import Path +from docopt import docopt + + +def is_interactive(invocation_list): + return not (invocation_list.get('--database') + and invocation_list.get('--pipeline-directory') + and invocation_list.get('--data-directory')) + + +def write_to_config_file(config): + root_dir = Path(__file__).resolve().parent.parent + config_file = Path(root_dir, 'CONFIG.json') + with open(config_file, 'w') as f: + json.dump(config, f) + + +def print_install_banner(): + width = 50 + delimiter = '-' * width + print('{0}\nCARMIN-Server Setup (Press CTRL-C to quit)\n{0}'.format( + delimiter)) + + +ask_pipeline = "Enter path to pipeline directory: " +ask_data = "Enter path to data directory: " +ask_database = "Enter path or URI to the database (to use the default sqlite database, leave blank): " + +if __name__ == '__main__': + args = docopt(__doc__) + try: + if is_interactive(args): + print_install_banner() + step_count = 1 + if not args.get('--pipeline-directory'): + pipeline_path = input('{}. {}'.format(step_count, + ask_pipeline)) + step_count += 1 + if not args.get('--data-directory'): + data_path = input('{}. {}'.format(step_count, ask_data)) + step_count += 1 + if not args.get('--database'): + database_path = input('{}. {}'.format(step_count, + ask_database)) + config_dict = { + "PIPELINE_DIRECTORY": pipeline_path, + "DATA_DIRECTORY": data_path, + "DATABASE_URL": database_path + } + write_to_config_file(config_dict) + exit("\nCARMIN-Server was successfully configured.") + except KeyboardInterrupt: + exit() diff --git a/cli/cli_helper.py b/cli/cli_helper.py new file mode 100644 index 0000000..267e85a --- /dev/null +++ b/cli/cli_helper.py @@ -0,0 +1,5 @@ +from pathlib import Path + + +def project_root(): + return Path(__file__).resolve().parent.parent diff --git a/server/__init__.py b/server/__init__.py index a16f578..cfedcda 100644 --- a/server/__init__.py +++ b/server/__init__.py @@ -1,3 +1,4 @@ +import sys from server.server_helper import create_app from server.api import declare_api @@ -10,7 +11,16 @@ def main(): declare_api(app) start_up() - app.run(host='0.0.0.0', port=int(8080)) + if len(sys.argv) > 1: + port = sys.argv[1] + try: + port = int(port) + except ValueError: + print("Invalid port number. Port must be an integer.") + exit(1) + else: + port = 8080 + app.run(host='0.0.0.0', port=port) from server.startup_validation import start_up diff --git a/setup.py b/setup.py index 9123bcf..8cbe932 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,21 @@ +import os from setuptools import setup, find_packages + +def get_version(): + version_file = open( + os.path.join(os.path.dirname(os.path.realpath(__file__)), 'VERSION')) + return version_file.read().strip() + + NAME = "carmin-server" -VERSION = "0.1" +VERSION = get_version() DEPS = [ "flask-restful>=0.3.6,<1.0", "flask-sqlalchemy>-2.3.2,<3.0", "psycopg2-binary>=2.7.4,<3.0", "marshmallow>=2.15.0,<3.0", "marshmallow_enum>=1.4.1,<2.0", "boutiques>=0.5.6,<1.0", - "blinker>=1.4,<2.0", "typing>=3.6.4,<4.0", "scandir>=1.7,<2.0", - "psutil>=5.4.5,<6.0" + "blinker>=1.4,<2.0", "docopt>=0.6.2,<1.0", "typing>=3.6.4,<4.0", + "scandir>=1.7,<2.0", "psutil>=5.4.5,<6.0" ] setup(