Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor module loader #455

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
6 changes: 0 additions & 6 deletions backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import sys
import os

from common.lib.module_loader import ModuleCollector

# load modules
all_modules = ModuleCollector()

# add 4CAT root as import path
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)) + "/..")
6 changes: 5 additions & 1 deletion backend/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from common.lib.queue import JobQueue
from common.lib.database import Database
from common.lib.module_loader import ModuleCollector
from backend.lib.manager import WorkerManager
from common.lib.logger import Logger

Expand Down Expand Up @@ -66,9 +67,12 @@ def run(as_daemon=True, log_level="INFO"):
config.with_db(db)
config.ensure_database()

# load 4CAT modules and cache the results
modules = ModuleCollector(config=config, write_cache=True)

# make it happen
# this is blocking until the back-end is shut down
WorkerManager(logger=log, database=db, queue=queue, as_daemon=as_daemon)
WorkerManager(logger=log, database=db, queue=queue, modules=modules, as_daemon=as_daemon)

# clean up pidfile, if running as daemon
if as_daemon:
Expand Down
26 changes: 14 additions & 12 deletions backend/lib/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import signal
import time

from backend import all_modules
from common.lib.exceptions import JobClaimedException


Expand All @@ -15,24 +14,27 @@ class WorkerManager:
queue = None
db = None
log = None
modules = None

worker_pool = {}
job_mapping = {}
pool = []
looping = True

def __init__(self, queue, database, logger, as_daemon=True):
def __init__(self, queue, database, logger, modules, as_daemon=True):
"""
Initialize manager

:param queue: Job queue
:param database: Database handler
:param logger: Logger object
:param modules: Modules cache via ModuleLoader()
:param bool as_daemon: Whether the manager is being run as a daemon
"""
self.queue = queue
self.db = database
self.log = logger
self.modules = modules

if as_daemon:
signal.signal(signal.SIGTERM, self.abort)
Expand All @@ -43,7 +45,7 @@ def __init__(self, queue, database, logger, as_daemon=True):
self.validate_datasources()

# queue jobs for workers that always need one
for worker_name, worker in all_modules.workers.items():
for worker_name, worker in self.modules.workers.items():
if hasattr(worker, "ensure_job"):
self.queue.add_job(jobtype=worker_name, **worker.ensure_job)

Expand All @@ -52,9 +54,9 @@ def __init__(self, queue, database, logger, as_daemon=True):
# flush module collector log buffer
# the logger is not available when this initialises
# but it is now!
if all_modules.log_buffer:
self.log.warning(all_modules.log_buffer)
all_modules.log_buffer = ""
if self.modules.log_buffer:
self.log.warning(self.modules.log_buffer)
self.modules.log_buffer = ""

# it's time
self.loop()
Expand Down Expand Up @@ -86,8 +88,8 @@ def delegate(self):
for job in jobs:
jobtype = job.data["jobtype"]

if jobtype in all_modules.workers:
worker_class = all_modules.workers[jobtype]
if jobtype in self.modules.workers:
worker_class = self.modules.workers[jobtype]
if jobtype not in self.worker_pool:
self.worker_pool[jobtype] = []

Expand All @@ -96,7 +98,7 @@ def delegate(self):
if len(self.worker_pool[jobtype]) < worker_class.max_workers:
try:
job.claim()
worker = worker_class(logger=self.log, manager=self, job=job, modules=all_modules)
worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
worker.start()
self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
self.worker_pool[jobtype].append(worker)
Expand Down Expand Up @@ -162,11 +164,11 @@ def validate_datasources(self):
sources.
"""

for datasource in all_modules.datasources:
if datasource + "-search" not in all_modules.workers and datasource + "-import" not in all_modules.workers:
for datasource in self.modules.datasources:
if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)

all_modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource)
self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource)

def abort(self, signal=None, stack=None):
"""
Expand Down
2 changes: 1 addition & 1 deletion backend/lib/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def process(self):
pipeline[-1]["parameters"]["next"] = [last]

analysis_pipeline = DataSet(parameters=pipeline[0]["parameters"], type=pipeline[0]["type"], db=self.db,
parent=self.dataset.key)
parent=self.dataset.key, modules=self.modules)

# this starts the pipeline
self.queue.add_job(pipeline[0]["type"], remote_id=analysis_pipeline.key)
Expand Down
39 changes: 19 additions & 20 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from common.lib.helpers import get_software_commit, remove_nuls, send_email
from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException,
DataSetException, MapItemException)
from common.config_manager import config, ConfigWrapper
from common.config_manager import ConfigWrapper
from common.lib.user import User


Expand All @@ -37,14 +37,14 @@ class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
useful is another question).

To determine whether a processor can process a given dataset, you can
define a `is_compatible_with(FourcatModule module=None, str user=None):) -> bool` class
define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:

.. code-block:: python

@classmethod
def is_compatible_with(cls, module=None, user=None):
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"


Expand Down Expand Up @@ -101,19 +101,18 @@ def work(self):
try:
# a dataset can have multiple owners, but the creator is the user
# that actually queued the processor, so their config is relevant
self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db)
self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
self.owner = self.dataset.creator
except DataSetException as e:
# query has been deleted in the meantime. finish without error,
# as deleting it will have been a conscious choice by a user
self.job.finish()
return

# set up config reader using the worker's DB connection and the dataset
# creator. This ensures that if a value has been overriden for the owner,
# the overridden value is used instead.
config.with_db(self.db)
self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner))
# set up config reader wrapping the worker's config manager, which is
# in turn the one passed to it by the WorkerManager, which is the one
# originally loaded in bootstrap
self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))

if self.dataset.data.get("key_parent", None):
# search workers never have parents (for now), so we don't need to
Expand Down Expand Up @@ -170,7 +169,7 @@ def work(self):
# get parameters
# if possible, fill defaults where parameters are not provided
given_parameters = self.dataset.parameters.copy()
all_parameters = self.get_options(self.dataset)
all_parameters = self.get_options(self.dataset, config=self.config)
self.parameters = {
param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
for param in [*all_parameters.keys(), *given_parameters.keys()]
Expand All @@ -179,7 +178,7 @@ def work(self):
# now the parameters have been loaded into memory, clear any sensitive
# ones. This has a side-effect that a processor may not run again
# without starting from scratch, but this is the price of progress
options = self.get_options(self.dataset.get_parent())
options = self.get_options(self.dataset.get_parent(), config=self.config)
for option, option_settings in options.items():
if option_settings.get("sensitive"):
self.dataset.delete_parameter(option)
Expand Down Expand Up @@ -241,7 +240,7 @@ def after_process(self):
next_parameters = next.get("parameters", {})
next_type = next.get("type", "")
try:
available_processors = self.dataset.get_available_processors(user=self.dataset.creator)
available_processors = self.dataset.get_available_processors(config=self.config)
except ValueError:
self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
break
Expand All @@ -259,7 +258,8 @@ def after_process(self):
parent=self.dataset.key,
extension=available_processors[next_type].extension,
is_private=self.dataset.is_private,
owner=self.dataset.creator
owner=self.dataset.creator,
modules=self.modules
)
self.queue.add_job(next_type, remote_id=next_analysis.key)
else:
Expand Down Expand Up @@ -328,7 +328,7 @@ def after_process(self):

self.job.finish()

if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
owner = self.dataset.get_parameters().get("email-complete", False)
# Check that username is email address
if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
Expand All @@ -339,8 +339,8 @@ def after_process(self):
import html2text

self.log.debug("Sending email to %s" % owner)
dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key
sender = config.get('mail.noreply')
dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
sender = self.config.get('mail.noreply')
message = MIMEMultipart("alternative")
message["From"] = sender
message["To"] = owner
Expand Down Expand Up @@ -777,7 +777,7 @@ def is_filter(cls):
return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()

@classmethod
def get_options(cls, parent_dataset=None, user=None):
def get_options(cls, parent_dataset=None, config=None):
"""
Get processor options

Expand All @@ -786,12 +786,11 @@ def get_options(cls, parent_dataset=None, user=None):
fine-grained options, e.g. in cases where the availability of options
is partially determined by the parent dataset's parameters.

:param config:
:param DataSet parent_dataset: An object representing the dataset that
the processor would be run on
:param User user: Flask user the options will be displayed for, in
case they are requested for display in the 4CAT web interface. This can
be used to show some options only to privileges users.
"""

return cls.options if hasattr(cls, "options") else {}

@classmethod
Expand Down
8 changes: 3 additions & 5 deletions backend/lib/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from pathlib import Path
from backend.lib.worker import BasicWorker

from common.config_manager import config

class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta):
"""
Abstract JSON scraper class
Expand Down Expand Up @@ -75,13 +73,13 @@ def work(self):
try:
# see if any proxies were configured that would work for this URL
protocol = url.split(":")[0]
if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]:
proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])}
if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]:
proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])}
else:
proxies = None

# do the request!
data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
if self.job.data["attempts"] > 2:
self.job.finish()
Expand Down
3 changes: 2 additions & 1 deletion backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def process(self):
query_parameters = self.dataset.get_parameters()
results_file = self.dataset.get_results_path()

self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options().get(k, {}).get("sensitive", False)}))
self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options(
config=self.config).get(k, {}).get("sensitive", False)}))

# Execute the relevant query (string-based, random, countryflag-based)
try:
Expand Down
8 changes: 4 additions & 4 deletions backend/lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ def __init__(self, logger, job, queue=None, manager=None, modules=None):
self.manager = manager
self.job = job
self.init_time = int(time.time())
self.config = ConfigDummy()
self.config = modules.config

# all_modules cannot be easily imported into a worker because all_modules itself
# ModuleCollector cannot be easily imported into a worker because it itself
# imports all workers, so you get a recursive import that Python (rightly) blocks
# so for workers, all_modules' content is passed as a constructor argument
self.all_modules = modules
# so for workers, modules data is passed as a constructor argument
self.modules = modules

database_appname = "%s-%s" % (self.type, self.job.data["id"])
self.db = Database(logger=self.log, appname=database_appname,
Expand Down
8 changes: 5 additions & 3 deletions backend/workers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
import json

from common.config_manager import config
from backend.lib.worker import BasicWorker


Expand All @@ -15,8 +14,8 @@ class InternalAPI(BasicWorker):

ensure_job = {"remote_id": "localhost"}

host = config.get('API_HOST')
port = config.get('API_PORT')
host = None
port = None

def work(self):
"""
Expand All @@ -27,6 +26,9 @@ def work(self):

:return:
"""
self.host = self.config.get('API_HOST')
self.port = self.config.get('API_PORT')

if self.port == 0:
# if configured not to listen, just loop until the backend shuts
# down we can't return here immediately, since this is a worker,
Expand Down
7 changes: 3 additions & 4 deletions backend/workers/check_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import requests
import json

from common.config_manager import config
from common.lib.helpers import add_notification, get_github_version
from backend.lib.worker import BasicWorker
from pathlib import Path
Expand All @@ -22,11 +21,11 @@ class UpdateChecker(BasicWorker):
max_workers = 1

# check once every three hours
ensure_job = {"remote_id": config.get("4cat.github_url"), "interval": 10800}
ensure_job = {"remote_id": "github", "interval": 10800}

def work(self):
versionfile = Path(config.get("PATH_ROOT"), "config/.current-version")
repo_url = config.get("4cat.github_url")
versionfile = Path(self.config.get("PATH_ROOT"), "config/.current-version")
repo_url = self.config.get("4cat.github_url")

if not versionfile.exists() or not repo_url:
# need something to compare against...
Expand Down
3 changes: 1 addition & 2 deletions backend/workers/cleanup_tempfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from pathlib import Path

from common.config_manager import config
from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.exceptions import WorkerInterruptedException, DataSetException
Expand Down Expand Up @@ -34,7 +33,7 @@ def work(self):
:return:
"""

result_files = Path(config.get('PATH_DATA')).glob("*")
result_files = Path(self.config.get('PATH_DATA')).glob("*")
for file in result_files:
if file.stem.startswith("."):
# skip hidden files
Expand Down
Loading