From 6e842cbbe5a2fccee91a2137654b05f907f0a140 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Fri, 2 Aug 2024 08:44:45 +0100 Subject: [PATCH] Develop (#82) * allow paused tasks, use scheduler timezone instead of tzlocal, make tzlocal optional (#80) Changes: - cleanup unused to_bool - pass timezone info from scheduler to get_next_trigger_time, so they are scheduler specific - make tzlocal optional and fallback to utc - allow to_datetime be used without timezone - allow submitting paused tasks - replace UndefinedType by Undefined and add a shim for backward compatibility - shrink pending_tasks, store_alias is now set in add_task - allow updating task attributes while submitting via add_task - fix incorrect fn references in documentation - update documentation - allow updating more attributes in-place in add_task - remove old types from TaskState - allow submitting tasks only once (set a fuse submitted) - add tests - update release notes * optional loguru, allow switching logging (#81) - make loguru optional, fallback to normal logging - make normal logging selectable by overwriting either per scheduler or per default_loggers_class in asyncz.schedulers.base - splitout process_pool from pool and pass special logger - provide tests - update release notes and docs * fix test workflow and docker-compose * fix python 3.8 typings * bump version * add warning about flaky integration tests * fix spurious integration test failures --------- Co-authored-by: Alexander --- .github/workflows/test-suite.yml | 17 ++- asyncz/__init__.py | 2 +- asyncz/datastructures.py | 10 +- asyncz/executors/__init__.py | 3 +- asyncz/executors/asyncio.py | 9 +- asyncz/executors/base.py | 41 +++--- asyncz/executors/debug.py | 5 +- asyncz/executors/pool.py | 24 +--- asyncz/executors/process_pool.py | 81 +++++++++++ asyncz/executors/types.py | 6 +- asyncz/schedulers/base.py | 233 ++++++++++++++++++++++--------- asyncz/schedulers/defaults.py | 2 +- asyncz/schedulers/types.py | 32 +++-- asyncz/stores/base.py | 7 +- asyncz/stores/mongo.py | 4 +- asyncz/stores/redis.py | 4 +- asyncz/stores/sqlalchemy.py | 4 +- asyncz/tasks/base.py | 22 +-- asyncz/tasks/types.py | 17 ++- asyncz/triggers/combination.py | 15 +- asyncz/triggers/cron/trigger.py | 41 ++++-- asyncz/triggers/date.py | 8 +- asyncz/triggers/interval.py | 36 +++-- asyncz/triggers/types.py | 4 +- asyncz/typing.py | 6 +- asyncz/utils.py | 62 ++++---- docker-compose.yml | 2 +- docs/release-notes.md | 22 +++ docs/schedulers.md | 45 +++++- docs/tasks.md | 3 +- docs_src/schedulers/add_task.py | 21 ++- docs_src/schedulers/method1.py | 20 ++- docs_src/schedulers/method2.py | 7 +- docs_src/schedulers/method3.py | 7 +- docs_src/tasks/create_task.py | 13 ++ docs_src/triggers/custom.py | 4 +- pyproject.toml | 5 +- tests/conftest.py | 7 +- tests/test_executors.py | 6 +- tests/test_integrations.py | 17 ++- tests/test_schedulers.py | 112 ++++++++++++++- tests/test_stores.py | 5 +- tests/test_tasks.py | 6 +- tests/test_triggers.py | 195 +++++++++++++++----------- tests/test_utils.py | 36 ----- 45 files changed, 850 insertions(+), 378 deletions(-) create mode 100644 asyncz/executors/process_pool.py diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 9f6aa3c..03ff28e 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -18,6 +18,17 @@ jobs: strategy: matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + services: + redis: + image: redis + ports: + - 127.0.0.1:6379:6379 + + mongodb: + image: mongo + ports: + - 127.0.0.1:27017:27017 + steps: - uses: "actions/checkout@v4" - uses: "actions/setup-python@v5" @@ -29,14 +40,12 @@ jobs: with: path: ${{ env.pythonLocation }} key: ${{ runner.os }}-python-${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml') }}-test-v02 - - name: "Start docker services" - run: docker-compose up -d - name: "Install dependencies" if: steps.cache.outputs.cache-hit != 'true' run: "pip install hatch" - name: "Run linting" run: "hatch fmt --check" - # - name: "Run mypy" - # run: "hatch run test:check_types" + - name: "Run mypy" + run: "hatch run test:check_types" - name: "Run tests" run: "hatch test" diff --git a/asyncz/__init__.py b/asyncz/__init__.py index 1f4c4d4..ae6db5f 100644 --- a/asyncz/__init__.py +++ b/asyncz/__init__.py @@ -1 +1 @@ -__version__ = "0.10.1" +__version__ = "0.11.0" diff --git a/asyncz/datastructures.py b/asyncz/datastructures.py index d358a3b..5103075 100644 --- a/asyncz/datastructures.py +++ b/asyncz/datastructures.py @@ -1,5 +1,4 @@ from datetime import datetime, timedelta, tzinfo -from datetime import timezone as dt_timezone from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union from pydantic import BaseModel, ConfigDict @@ -29,10 +28,10 @@ class IntervalState(BaseDatastructureState): Handles the state for a IntervalTrigger. """ - timezone: Union[dt_timezone, str, tzinfo] + timezone: Optional[tzinfo] = None start_at: datetime end_at: Optional[datetime] = None - interval: Optional[Union[dt_timezone, timedelta]] = None + interval: Optional[timedelta] = None jitter: Optional[int] = None @@ -50,7 +49,7 @@ class CronState(BaseDatastructureState): Handles the state of the CronTrigger. """ - timezone: Optional[Union[dt_timezone, str, tzinfo]] = None + timezone: Optional[tzinfo] = None start_at: Optional[datetime] = None end_at: Optional[datetime] = None fields: Optional[List[Any]] = None @@ -65,11 +64,10 @@ class TaskState(BaseDatastructureState): # type: ignore args: Optional[Any] = None kwargs: Optional[Any] = None coalesce: Optional[bool] = None - trigger: Optional[Union[str, TriggerType]] = None + trigger: Optional[TriggerType] = None executor: Optional[str] = None mistrigger_grace_time: Optional[int] = None max_instances: Optional[int] = None next_run_time: Optional[datetime] = None scheduler: Optional[Any] = None store_alias: Optional[str] = None - store: Optional[Union[str, StoreType]] = None diff --git a/asyncz/executors/__init__.py b/asyncz/executors/__init__.py index 6e1f2e9..4e80b87 100644 --- a/asyncz/executors/__init__.py +++ b/asyncz/executors/__init__.py @@ -1,7 +1,8 @@ from .asyncio import AsyncIOExecutor from .base import BaseExecutor from .debug import DebugExecutor -from .pool import ProcessPoolExecutor, ThreadPoolExecutor +from .pool import ThreadPoolExecutor +from .process_pool import ProcessPoolExecutor __all__ = [ "BaseExecutor", diff --git a/asyncz/executors/asyncio.py b/asyncz/executors/asyncio.py index 0912ff7..eb85e61 100644 --- a/asyncz/executors/asyncio.py +++ b/asyncz/executors/asyncio.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Any, List, Set +from typing import TYPE_CHECKING, Any, List, Set, cast from asyncz.executors.base import BaseExecutor, run_coroutine_task, run_task from asyncz.utils import iscoroutinefunction_partial @@ -35,6 +35,7 @@ def shutdown(self, wait: bool = True) -> None: def do_send_task(self, task: "TaskType", run_times: List[datetime]) -> None: task_id = task.id assert task_id is not None, "Cannot send decorator type task" + assert self.logger is not None, "logger is None" def callback(fn: Any) -> None: self.pending_futures.discard(fn) @@ -46,11 +47,13 @@ def callback(fn: Any) -> None: self.run_task_success(task_id, events) if iscoroutinefunction_partial(task.fn): - coroutine = run_coroutine_task(task, task.store_alias, run_times, self.logger) # type: ignore + coroutine = run_coroutine_task( + task, cast(str, task.store_alias), run_times, self.logger + ) fn = self.event_loop.create_task(coroutine) else: fn = self.event_loop.run_in_executor( - None, run_task, task, task.store_alias, run_times, self.logger + None, run_task, task, cast(str, task.store_alias), run_times, self.logger ) fn.add_done_callback(callback) diff --git a/asyncz/executors/base.py b/asyncz/executors/base.py index af1e2f1..a04e1ce 100644 --- a/asyncz/executors/base.py +++ b/asyncz/executors/base.py @@ -1,3 +1,4 @@ +import logging import sys import traceback from collections import defaultdict @@ -5,22 +6,18 @@ from datetime import timezone as tz from threading import RLock from traceback import format_tb -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast - -from loguru import logger -from loguru._logger import Logger +from typing import TYPE_CHECKING, Any, Callable, Dict, List, cast from asyncz.events import TaskExecutionEvent from asyncz.events.constants import TASK_ERROR, TASK_EXECUTED, TASK_MISSED from asyncz.exceptions import MaximumInstancesError from asyncz.executors.types import ExecutorType -from asyncz.state import BaseStateExtra if TYPE_CHECKING: from asyncz.tasks.types import TaskType -class BaseExecutor(BaseStateExtra, ExecutorType): +class BaseExecutor(ExecutorType): """ Base model for the executors. It defines the interface for all the executors used by the Asyncz. @@ -28,7 +25,7 @@ class BaseExecutor(BaseStateExtra, ExecutorType): """ def __init__(self, **kwargs: Any) -> None: - super().__init__(**kwargs) + super().__init__() self.instances: Dict[str, int] = defaultdict(lambda: 0) def start(self, scheduler: Any, alias: str) -> None: @@ -42,8 +39,9 @@ def start(self, scheduler: Any, alias: str) -> None: """ self.scheduler = scheduler self.lock: RLock = scheduler.create_lock() - self.logger: Any = logger - self.logger.bind(logger_name=f"asyncz.executors.{alias}") + self.logger_name = f"asyncz.executors.{alias}" + # send to tasks + self.logger = self.scheduler.loggers[self.logger_name] def shutdown(self, wait: bool = True) -> None: """ @@ -92,14 +90,16 @@ def run_task_error(self, task_id: str) -> None: if self.instances[task_id] == 0: del self.instances[task_id] - self.logger.opt(exception=True).error(f"Error running task {task_id}", exc_info=True) + self.scheduler.loggers[self.logger_name].error( + f"Error running task {task_id}", exc_info=True + ) def run_task( task: "TaskType", store_alias: str, run_times: List[datetime], - _logger: Optional[Any] = None, + logger: logging.Logger, ) -> List[TaskExecutionEvent]: """ Called by executors to run the task. Returns a list of scheduler events to be dispatched by the @@ -107,8 +107,6 @@ def run_task( The run task is made to run in async mode. """ - if not _logger: - _logger = logger events = [] for run_time in run_times: @@ -125,10 +123,10 @@ def run_task( scheduled_run_time=run_time, ) ) - _logger.warning(f"Run time of task '{task}' was missed by {difference}") + logger.warning(f"Run time of task '{task}' was missed by {difference}") continue - _logger.info(f'Running task "{task}" (scheduled at {run_time})') + logger.info(f'Running task "{task}" (scheduled at {run_time})') try: return_value = cast(Callable[..., Any], task.fn)(*task.args, **task.kwargs) except Exception as exc: @@ -156,7 +154,7 @@ def run_task( return_value=return_value, ) ) - _logger.info(f"Task '{task}' executed successfully.") + logger.info(f"Task '{task}' executed successfully.") return events @@ -164,7 +162,7 @@ async def run_coroutine_task( task: "TaskType", store_alias: str, run_times: List[datetime], - _logger: Optional["Logger"] = None, + logger: logging.Logger, ) -> List[TaskExecutionEvent]: """ Called by executors to run the task. Returns a list of scheduler events to be dispatched by the @@ -172,9 +170,6 @@ async def run_coroutine_task( The run task is made to run in async mode. """ - if not _logger: - _logger = logger # type: ignore - events = [] for run_time in run_times: mistrigger_grace_time = task.mistrigger_grace_time @@ -190,10 +185,10 @@ async def run_coroutine_task( scheduled_run_time=run_time, ) ) - _logger.warning(f"Run time of task '{task}' was missed by {difference}") # type: ignore + logger.warning(f"Run time of task '{task}' was missed by {difference}") continue - _logger.info(f'Running task "{task}" (scheduled at {run_time})') # type: ignore + logger.info(f'Running task "{task}" (scheduled at {run_time})') try: return_value = await cast(Callable[..., Any], task.fn)(*task.args, **task.kwargs) except Exception as exc: @@ -221,6 +216,6 @@ async def run_coroutine_task( return_value=return_value, ) ) - _logger.info(f"Task '{task}' executed successfully") # type: ignore + logger.info(f"Task '{task}' executed successfully") return events diff --git a/asyncz/executors/debug.py b/asyncz/executors/debug.py index 0369481..20bf8d9 100644 --- a/asyncz/executors/debug.py +++ b/asyncz/executors/debug.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, cast from asyncz.executors.base import BaseExecutor, run_task @@ -19,8 +19,9 @@ def do_send_task( run_times: List[datetime], ) -> None: assert task.id is not None, "Cannot send decorator type task" + assert self.logger is not None, "logger is None" try: - events = run_task(task, task.store_alias, run_times, self.logger) # type: ignore + events = run_task(task, cast(str, task.store_alias), run_times, self.logger) except Exception: self.run_task_error(task.id) else: diff --git a/asyncz/executors/pool.py b/asyncz/executors/pool.py index ba91b71..4aaf3bb 100644 --- a/asyncz/executors/pool.py +++ b/asyncz/executors/pool.py @@ -39,9 +39,11 @@ def callback(fn: Any) -> None: self.run_task_success(task_id, fn.result()) try: - fn = self.pool.submit(run_task, task, task.store_alias, run_times) + fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger) except (BrokenProcessPool, TypeError): - self.logger.warning("Process pool is broken. Replacing pool with a new instance.") + self.scheduler.loggers[self.logger_name].warning( + "Process pool is broken. Replacing pool with a new instance." + ) self.pool = self.pool.__class__(self.pool.max_workers) fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger) @@ -68,21 +70,3 @@ def __init__(self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **k pool_kwargs = pool_kwargs or {} pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs) super().__init__(pool, **kwargs) - - -class ProcessPoolExecutor(BasePoolExecutor): - """ - An executor that runs tasks in a concurrent.futures process pool. - - Args: - max_workers: The maximum number of spawned processes. - pool_kwargs: Dict of keyword arguments to pass to the underlying - ProcessPoolExecutor constructor. - """ - - def __init__( - self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **kwargs: Any - ) -> None: - pool_kwargs = pool_kwargs or {} - pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs) - super().__init__(pool, **kwargs) diff --git a/asyncz/executors/process_pool.py b/asyncz/executors/process_pool.py new file mode 100644 index 0000000..dadc051 --- /dev/null +++ b/asyncz/executors/process_pool.py @@ -0,0 +1,81 @@ +import concurrent.futures +from multiprocessing import Pipe, connection +from threading import Thread +from typing import TYPE_CHECKING, Any, Optional, cast + +from asyncz.executors.pool import BasePoolExecutor + +if TYPE_CHECKING: + import logging + + from asyncz.schedulers.types import SchedulerType + +# because multiprocessing is heavy weight, it is split out from pool + + +class ProcessPoolLoggerSenderFnWrap: + def __init__(self, send_pipe: connection.Connection, fn_name: str) -> None: + self.send_pipe = send_pipe + self.fn_name = fn_name + + def __call__(self, *args: Any, **kwargs: Any) -> None: + self.send_pipe.send((self.fn_name, args, kwargs)) + + +class ProcessPoolLoggerSender: + def __init__(self, send_pipe: connection.Connection) -> None: + self.send_pipe = send_pipe + + def __getattr__(self, item: str) -> Any: + if item.startswith("_"): + return object.__getattr__(self, item) + return ProcessPoolLoggerSenderFnWrap(self.send_pipe, item) + + +class ProcessPoolReceiver(Thread): + def __init__(self, receive_pipe: connection.Connection, logger: "logging.Logger") -> None: + super().__init__() + self.receive_pipe = receive_pipe + self.logger = logger + self.start() + + def run(self) -> None: + try: + while True: + fn_name, args, kwargs = self.receive_pipe.recv() + if not fn_name.startswith("_"): + getattr(self.logger, fn_name)(*args, **kwargs) + except EOFError: + pass + + +class ProcessPoolExecutor(BasePoolExecutor): + """ + An executor that runs tasks in a concurrent.futures process pool. + + Args: + max_workers: The maximum number of spawned processes. + pool_kwargs: Dict of keyword arguments to pass to the underlying + ProcessPoolExecutor constructor. + """ + + def __init__( + self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **kwargs: Any + ) -> None: + self.receive_pipe, self.send_pipe = Pipe(False) + pool_kwargs = pool_kwargs or {} + pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs) + super().__init__(pool, **kwargs) + + def start(self, scheduler: "SchedulerType", alias: str) -> None: + super().start(scheduler, alias) + assert self.logger is not None, "logger is None" + # move the old logger to logger_receiver + self.logger_receiver = ProcessPoolReceiver(self.receive_pipe, self.logger) + # and send the process logger instead + self.logger = cast("logging.Logger", ProcessPoolLoggerSender(self.send_pipe)) + + def shutdown(self, wait: bool = True) -> None: + super().shutdown(wait=wait) + self.send_pipe.close() + self.logger_receiver.join() diff --git a/asyncz/executors/types.py b/asyncz/executors/types.py index cc15525..101a0a8 100644 --- a/asyncz/executors/types.py +++ b/asyncz/executors/types.py @@ -2,15 +2,19 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import TYPE_CHECKING, Any, List +from typing import TYPE_CHECKING, Any, List, Optional if TYPE_CHECKING: + import logging + from asyncz.events import TaskExecutionEvent from asyncz.schedulers.types import SchedulerType from asyncz.tasks.types import TaskType class ExecutorType(ABC): + logger: Optional[logging.Logger] + @abstractmethod def start(self, scheduler: SchedulerType, alias: str) -> None: """ diff --git a/asyncz/schedulers/base.py b/asyncz/schedulers/base.py index 202d8f4..76493cc 100644 --- a/asyncz/schedulers/base.py +++ b/asyncz/schedulers/base.py @@ -1,3 +1,4 @@ +import logging import sys import warnings from abc import abstractmethod @@ -5,7 +6,7 @@ from datetime import datetime, timedelta from functools import partial from importlib import import_module -from threading import Lock, RLock +from threading import TIMEOUT_MAX, Lock, RLock from typing import ( Any, Callable, @@ -19,9 +20,6 @@ overload, ) -from loguru import logger -from tzlocal import get_localzone - from asyncz.enums import PluginInstance, SchedulerState from asyncz.events.base import SchedulerEvent, TaskEvent, TaskSubmissionEvent from asyncz.events.constants import ( @@ -53,14 +51,53 @@ from asyncz.schedulers import defaults from asyncz.schedulers.asgi import ASGIApp, ASGIHelper from asyncz.schedulers.datastructures import TaskDefaultStruct -from asyncz.schedulers.types import SchedulerType +from asyncz.schedulers.types import LoggersType, SchedulerType from asyncz.stores.memory import MemoryStore from asyncz.stores.types import StoreType from asyncz.tasks import Task from asyncz.tasks.types import TaskType from asyncz.triggers.types import TriggerType -from asyncz.typing import UndefinedType, undefined -from asyncz.utils import TIMEOUT_MAX, maybe_ref, timedelta_seconds, to_timezone +from asyncz.typing import Undefined, undefined +from asyncz.utils import maybe_ref, timedelta_seconds, to_timezone_with_fallback + + +class ClassicLogging(LoggersType): + def __missing__(self, item: str) -> logging.Logger: + return logging.getLogger(item) + + +class LoguruLoggerFnWrapper: + def __init__(self, logger: Any, fn_name: str) -> None: + self.logger = logger + self.fn_name = fn_name + + def __call__(self, *args: Any, exc_info: bool = False, **kwargs: Any) -> Any: + logger = self.logger + if exc_info: + logger = logger.opt(exception=True) + return getattr(logger, self.fn_name)(*args, **kwargs) + + +class LoguruLoggerWrapper: + def __init__(self, logger: Any) -> None: + self.logger = logger + + def __getattr__(self, item: str) -> LoguruLoggerFnWrapper: + return LoguruLoggerFnWrapper(self.logger, item) + + +default_loggers_class: Type[LoggersType] = ClassicLogging + +try: + from loguru import logger + + class LoguruLogging(LoggersType): + def __missing__(self, item: str) -> logging.Logger: + return cast(logging.Logger, LoguruLoggerWrapper(logger.bind(name=item))) + + default_loggers_class = LoguruLogging +except ImportError: + pass class BaseScheduler(SchedulerType): @@ -81,9 +118,12 @@ class BaseScheduler(SchedulerType): state: current running state of the scheduler. """ - def __init__(self, global_config: Optional[Any] = None, **kwargs: Any) -> None: + def __init__( + self, + global_config: Optional[Any] = None, + **kwargs: Any, + ) -> None: super().__init__() - self.global_config: Dict[str, Any] = global_config or {} self.trigger_plugins = dict(defaults.triggers.items()) self.trigger_classes: Dict[str, Type[TriggerType]] = {} self.executor_plugins: Dict[str, str] = dict(defaults.executors.items()) @@ -96,13 +136,12 @@ def __init__(self, global_config: Optional[Any] = None, **kwargs: Any) -> None: self.store_lock: RLock = self.create_lock() self.listeners: List[Any] = [] self.listeners_lock: RLock = self.create_lock() - self.pending_tasks: List[Tuple[TaskType, str, bool]] = [] + self.pending_tasks: List[Tuple[TaskType, bool, bool]] = [] self.state: Union[SchedulerState, Any] = SchedulerState.STATE_STOPPED - self.logger: Any = logger self.ref_counter: int = 0 self.ref_lock: Lock = Lock() - self.setup(self.global_config, **kwargs) + self.setup(global_config, **kwargs) def __getstate__(self) -> None: raise TypeError( @@ -197,12 +236,12 @@ def start(self, paused: bool = False) -> bool: for alias, store in self.stores.items(): store.start(self, alias) - for task, store_alias, replace_existing in self.pending_tasks: - self.real_add_task(task, store_alias, replace_existing) + for task, replace_existing, start_task in self.pending_tasks: + self.real_add_task(task, replace_existing, start_task) del self.pending_tasks[:] self.state = SchedulerState.STATE_PAUSED if paused else SchedulerState.STATE_RUNNING - self.logger.info("Scheduler started.") + self.loggers[self.logger_name].info("Scheduler started.") self.dispatch_event(SchedulerEvent(code=SCHEDULER_START)) if not paused: @@ -232,7 +271,7 @@ def shutdown(self, wait: bool = True) -> bool: for store in self.stores.values(): store.shutdown() - self.logger.info("Scheduler has been shutdown.") + self.loggers[self.logger_name].info("Scheduler has been shutdown.") self.dispatch_event(SchedulerEvent(code=SCHEDULER_SHUTDOWN)) return True @@ -247,7 +286,7 @@ def pause(self) -> None: raise SchedulerNotRunningError() elif self.state == SchedulerState.STATE_RUNNING: self.state = SchedulerState.STATE_PAUSED - self.logger.info("Paused scheduler task processing.") + self.loggers[self.logger_name].info("Paused scheduler task processing.") self.dispatch_event(SchedulerEvent(code=SCHEDULER_PAUSED)) def resume(self) -> None: @@ -258,7 +297,7 @@ def resume(self) -> None: raise SchedulerNotRunningError elif self.state == SchedulerState.STATE_PAUSED: self.state = SchedulerState.STATE_RUNNING - self.logger.info("Resumed scheduler task processing.") + self.loggers[self.logger_name].info("Resumed scheduler task processing.") self.dispatch_event(SchedulerEvent(code=SCHEDULER_RESUMED)) self.wakeup() @@ -417,12 +456,12 @@ def add_task( kwargs: Optional[Any] = None, id: Optional[str] = None, name: Optional[str] = None, - mistrigger_grace_time: Union[int, UndefinedType, None] = undefined, - coalesce: Union[bool, UndefinedType] = undefined, - max_instances: Union[int, UndefinedType, None] = undefined, - next_run_time: Union[datetime, str, UndefinedType, None] = undefined, - store: str = "default", - executor: str = "default", + mistrigger_grace_time: Union[int, Undefined, None] = undefined, + coalesce: Union[bool, Undefined] = undefined, + max_instances: Union[int, Undefined, None] = undefined, + next_run_time: Union[datetime, str, Undefined, None] = undefined, + store: Union[str, Undefined, None] = None, + executor: Union[str, Undefined, None] = None, replace_existing: bool = False, # old name fn: Optional[Any] = None, @@ -471,25 +510,60 @@ def add_task( ) fn_or_task = fn if isinstance(fn_or_task, TaskType): + assert fn_or_task.submitted is False, "Can submit tasks only once" + fn_or_task.submitted = True + # tweak task before submitting + # WARNING: in contrast to the decorator mode this really updates the task + # by providing an id (e.g. autogenerated) you can make a real task from decorator type task while submitting + task_update_kwargs: Dict[str, Any] = { + "scheduler": self, + "args": tuple(args) if args is not None else undefined, + "kwargs": dict(kwargs) if kwargs is not None else undefined, + "id": id or undefined, + "name": name or undefined, + "mistrigger_grace_time": mistrigger_grace_time, + "coalesce": coalesce, + "max_instances": max_instances, + "next_run_time": next_run_time, + "executor": executor if executor is not None else undefined, + "store_alias": store if store is not None else undefined, + } + if trigger: + task_update_kwargs["trigger"] = self.create_trigger(trigger, trigger_args) + # WARNING: when submitting a task object allow_mistrigger_by_default has no effect + task_update_kwargs = { + key: value for key, value in task_update_kwargs.items() if value is not undefined + } + fn_or_task.update_task(**task_update_kwargs) + # fallback if still not set. Set manually executor and store_alias to default. + if fn_or_task.executor is None: + fn_or_task.executor = "default" + if fn_or_task.store_alias is None: + fn_or_task.store_alias = "default" + assert fn_or_task.trigger is not None, "Cannot submit a task without a trigger." assert fn_or_task.id is not None, "Cannot submit a decorator type task." - fn_or_task.scheduler = self with self.store_lock: if self.state == SchedulerState.STATE_STOPPED: self.pending_tasks.append( - (fn_or_task, fn_or_task.store_alias or store, replace_existing) + ( + fn_or_task, + replace_existing, + next_run_time is not None, + ) ) - self.logger.info( + self.loggers[self.logger_name].info( "Adding task tentatively. It will be properly scheduled when the scheduler starts." ) else: self.real_add_task( - fn_or_task, fn_or_task.store_alias or store, replace_existing + fn_or_task, + replace_existing, + next_run_time is not None, ) return fn_or_task task_kwargs: Dict[str, Any] = { "scheduler": self, "trigger": self.create_trigger(trigger, trigger_args), - "executor": executor, "fn": fn_or_task, "args": tuple(args) if args is not None else (), "kwargs": dict(kwargs) if kwargs is not None else {}, @@ -499,7 +573,8 @@ def add_task( "coalesce": coalesce, "max_instances": max_instances, "next_run_time": next_run_time, - "store_alias": store, + "executor": executor if executor is not None else undefined, + "store_alias": store if store is not None else undefined, } task_kwargs = {key: value for key, value in task_kwargs.items() if value is not undefined} if task_kwargs["trigger"].allow_mistrigger_by_default: @@ -510,7 +585,9 @@ def add_task( task = Task(**task_kwargs) if task.fn is not None: - return self.add_task(task, replace_existing=replace_existing) + return self.add_task( + task, replace_existing=replace_existing, next_run_time=next_run_time + ) return task def update_task( @@ -565,7 +642,7 @@ def reschedule_task( """ trigger = self.create_trigger(trigger, trigger_args) now = datetime.now(self.timezone) - next_run_time = trigger.get_next_trigger_time(None, now) + next_run_time = trigger.get_next_trigger_time(self.timezone, None, now) return self.update_task(task_id, store, trigger=trigger, next_run_time=next_run_time) def pause_task(self, task_id: Union[TaskType, str], store: Optional[str] = None) -> "TaskType": @@ -594,7 +671,7 @@ def resume_task( with self.store_lock: task, store = self.lookup_task(task_id, store) now = datetime.now(self.timezone) - next_run_time = task.trigger.get_next_trigger_time(None, now) # type: ignore + next_run_time = task.trigger.get_next_trigger_time(self.timezone, None, now) # type: ignore if next_run_time: return self.update_task(task_id, store, next_run_time=next_run_time) @@ -616,8 +693,8 @@ def get_tasks(self, store: Optional[str] = None) -> List["TaskType"]: with self.store_lock: tasks = [] if self.state == SchedulerState.STATE_STOPPED: - for task, alias, _ in self.pending_tasks: - if store is None or alias == store: + for task, _, _ in self.pending_tasks: + if store is None or task.store_alias == store: tasks.append(task) else: for alias, _store in self.stores.items(): @@ -658,10 +735,10 @@ def delete_task( with self.store_lock: if self.state == SchedulerState.STATE_STOPPED: - for index, (task, alias, _) in enumerate(self.pending_tasks): - if task.id == task_id and store in (None, alias): + for index, (task, _, _) in enumerate(self.pending_tasks): + if task.id == task_id and store in (None, task.store_alias): del self.pending_tasks[index] - store_alias = alias + store_alias = task.store_alias break else: for alias, _store in self.stores.items(): @@ -679,7 +756,7 @@ def delete_task( event = TaskEvent(code=TASK_REMOVED, task_id=task_id, store=store_alias) self.dispatch_event(event) - self.logger.info(f"Removed task {task_id}.") + self.loggers[self.logger_name].info(f"Removed task {task_id}.") def remove_all_tasks(self, store: Optional[str]) -> None: """ @@ -689,7 +766,9 @@ def remove_all_tasks(self, store: Optional[str]) -> None: if self.state == SchedulerState.STATE_STOPPED: if store: self.pending_tasks = [ - pending for pending in self.pending_tasks if pending[1] != store + pending + for pending in self.pending_tasks + if pending[0].store_alias != store ] else: self.pending_tasks = [] @@ -711,11 +790,21 @@ def _setup(self, config: Any) -> None: """ Applies initial configurations called by the Base constructor. """ - self.logger = maybe_ref(config.pop("logger", None)) or logger - self.timezone = to_timezone(config.pop("timezone", None)) or get_localzone() + self.timezone = to_timezone_with_fallback(config.pop("timezone", None)) self.store_retry_interval = float(config.pop("store_retry_interval", 10)) self.task_defaults = TaskDefaultStruct(**(config.get("task_defaults", None) or {})) + loggers_class: Type[LoggersType] = ( + maybe_ref(config.pop("loggers_class", None)) or default_loggers_class + ) + self.loggers = loggers_class() + logger_name = config.pop("logger_name", None) + if logger_name: + self.logger_name = f"asyncz.schedulers.{logger_name}" + else: + self.logger_name = "asyncz.schedulers" + # initialize logger for scheduler + self.loggers[self.logger_name] self.executors.clear() for alias, value in config.get("executors", {}).items(): @@ -814,7 +903,7 @@ def lookup_task( """ if self.state == SchedulerState.STATE_STOPPED: for task, _, _ in self.pending_tasks: - if task.id == task_id: + if task.id == task_id and store_alias in (None, task.store_alias): return task, None else: for alias, store in self.stores.items(): @@ -840,7 +929,7 @@ def dispatch_event(self, event: "SchedulerEvent") -> None: try: callback(event) except BaseException: - self.logger.exception("Error notifying listener.") + self.loggers[self.logger_name].exception("Error notifying listener.") def check_uwsgi(self) -> None: """ @@ -854,7 +943,7 @@ def check_uwsgi(self) -> None: "option for the scheduler to work." ) - def real_add_task(self, task: "TaskType", store_alias: str, replace_existing: bool) -> None: + def real_add_task(self, task: "TaskType", replace_existing: bool, start_task: bool) -> None: """ Adds the task. @@ -863,18 +952,22 @@ def real_add_task(self, task: "TaskType", store_alias: str, replace_existing: bo store_alias: The alias of the store to add the task to. replace_existing: The flag indicating the replacement of the task. """ + assert task.trigger is not None, "Submitted task has no trigger set." + assert task.store_alias is not None, "Submitted task has no store_alias set." + assert task.executor is not None, "Submitted task has no executor set." replacements: Dict[str, Any] = {} # Calculate the next run time if there is none defined - if not getattr(task, "next_run_time", None): + if task.next_run_time is None and start_task: now = datetime.now(self.timezone) - replacements["next_run_time"] = task.trigger.get_next_trigger_time(None, now) # type: ignore + replacements["next_run_time"] = task.trigger.get_next_trigger_time( + self.timezone, None, now + ) # Apply replacements - task.pending = False task.update_task(**replacements) # Add the task to the given store - store = self.lookup_store(store_alias) + store = self.lookup_store(task.store_alias) try: store.add_task(task) except ConflictIdError: @@ -882,16 +975,17 @@ def real_add_task(self, task: "TaskType", store_alias: str, replace_existing: bo store.update_task(task) else: raise + task.pending = False - task.store_alias = store_alias - - event = TaskEvent(code=TASK_ADDED, task_id=task.id, alias=store_alias) + event = TaskEvent(code=TASK_ADDED, task_id=task.id, alias=task.store_alias) self.dispatch_event(event) - self.logger.info(f"Added task '{task.name}' to store '{store_alias}'.") + self.loggers[self.logger_name].info( + f"Added task '{task.name}' to store '{task.store_alias}'." + ) # Notify the scheduler about the new task. - if self.state == SchedulerState.STATE_RUNNING: + if start_task and self.state == SchedulerState.STATE_RUNNING: self.wakeup() def resolve_load_plugin(self, module_name: str) -> Any: @@ -971,10 +1065,10 @@ def process_tasks(self) -> Optional[float]: store_retry_interval seconds. """ if self.state == SchedulerState.STATE_PAUSED: - self.logger.debug("Scheduler is paused. Not processing tasks.") + self.loggers[self.logger_name].debug("Scheduler is paused. Not processing tasks.") return None - self.logger.debug("Looking for tasks to run.") + self.loggers[self.logger_name].debug("Looking for tasks to run.") now = datetime.now(self.timezone) next_wakeup_time: Optional[datetime] = None events = [] @@ -984,7 +1078,7 @@ def process_tasks(self) -> Optional[float]: try: due_tasks: List[TaskType] = store.get_due_tasks(now) except Exception as e: - self.logger.warning( + self.loggers[self.logger_name].warning( f"Error getting due tasks from the store {store_alias}: {e}." ) retry_wakeup_time = now + timedelta(seconds=self.store_retry_interval) @@ -996,20 +1090,20 @@ def process_tasks(self) -> Optional[float]: try: executor = self.lookup_executor(task.executor) # type: ignore except Exception: - self.logger.error( + self.loggers[self.logger_name].error( f"Executor lookup ('{task.executor}') failed for task '{task}'. Removing it from the store." ) self.delete_task(task.id, store_alias) continue - run_times = task.get_run_times(now) + run_times = task.get_run_times(self.timezone, now) run_times = run_times[-1:] if run_times and task.coalesce else run_times if run_times: try: executor.send_task(task, run_times) except MaxInterationsReached: - self.logger.warning( + self.loggers[self.logger_name].warning( f"Execution of task '{task}' skipped: Maximum number of running " f"instances reached '({task.max_instances})'." ) @@ -1020,9 +1114,10 @@ def process_tasks(self) -> Optional[float]: scheduled_run_times=run_times, ) events.append(event) - except BaseException: - self.logger.exception( - f"Error submitting task '{task}' to executor '{task.executor}'." + except Exception: + self.loggers[self.logger_name].exception( + f"Error submitting task '{task}' to executor '{task.executor}'.", + exc_info=True, ) else: event = TaskSubmissionEvent( @@ -1033,7 +1128,9 @@ def process_tasks(self) -> Optional[float]: ) events.append(event) - next_run = task.trigger.get_next_trigger_time(run_times[-1], now) # type: ignore + next_run = task.trigger.get_next_trigger_time( # type: ignore + self.timezone, run_times[-1], now + ) if next_run: task.update_task(next_run_time=next_run) store.update_task(task) @@ -1051,12 +1148,14 @@ def process_tasks(self) -> Optional[float]: wait_seconds: Optional[float] = None if self.state == SchedulerState.STATE_PAUSED: - self.logger.debug("Scheduler is paused. Waiting until resume() is called.") + self.loggers[self.logger_name].debug( + "Scheduler is paused. Waiting until resume() is called." + ) elif next_wakeup_time is None: - self.logger.debug("No tasks. Waiting until task is added.") + self.loggers[self.logger_name].debug("No tasks. Waiting until task is added.") else: wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX) - self.logger.debug( + self.loggers[self.logger_name].debug( f"Next wakeup is due at {next_wakeup_time} (in {wait_seconds} seconds)." ) return wait_seconds diff --git a/asyncz/schedulers/defaults.py b/asyncz/schedulers/defaults.py index 043f1bf..96598a5 100644 --- a/asyncz/schedulers/defaults.py +++ b/asyncz/schedulers/defaults.py @@ -13,7 +13,7 @@ "debug": "asyncz.executors.debug:DebugExecutor", "pool": "asyncz.executors.pool:ThreadPoolExecutor", "threadpool": "asyncz.executors.pool:ThreadPoolExecutor", - "processpool": "asyncz.executors.pool:ProcessPoolExecutor", + "processpool": "asyncz.executors.process_pool:ProcessPoolExecutor", "asyncio": "asyncz.executors.asyncio:AsyncIOExecutor", } diff --git a/asyncz/schedulers/types.py b/asyncz/schedulers/types.py index 85c6e28..9401b86 100644 --- a/asyncz/schedulers/types.py +++ b/asyncz/schedulers/types.py @@ -1,15 +1,16 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union from asyncz.events.constants import ( ALL_EVENTS, ) -from asyncz.typing import UndefinedType, undefined +from asyncz.typing import Undefined, undefined if TYPE_CHECKING: from datetime import datetime + from logging import Logger from threading import RLock from asyncz.events.base import SchedulerEvent @@ -19,7 +20,22 @@ from asyncz.triggers.types import TriggerType +class LoggersType(ABC): + def __init__(self) -> None: + self.data: Dict[str, Logger] = {} + + @abstractmethod + def __missing__(self, item: str) -> Logger: ... + + def __getitem__(self, item: str) -> Logger: + if item not in self.data: + self.data[item] = self.__missing__(item) + return self.data[item] + + class SchedulerType(ABC): + loggers: LoggersType + @abstractmethod def start(self, paused: bool = False) -> bool: """ @@ -130,12 +146,12 @@ def add_task( kwargs: Optional[Any] = None, id: Optional[str] = None, name: Optional[str] = None, - mistrigger_grace_time: Union[int, UndefinedType, None] = undefined, - coalesce: Union[bool, UndefinedType] = undefined, - max_instances: Union[int, UndefinedType, None] = undefined, - next_run_time: Union[datetime, str, UndefinedType, None] = undefined, - store: str = "default", - executor: str = "default", + mistrigger_grace_time: Union[int, Undefined, None] = undefined, + coalesce: Union[bool, Undefined] = undefined, + max_instances: Union[int, Undefined, None] = undefined, + next_run_time: Union[datetime, str, Undefined, None] = undefined, + store: Optional[str] = None, + executor: Optional[str] = None, replace_existing: bool = False, **trigger_args: Any, ) -> TaskType: diff --git a/asyncz/stores/base.py b/asyncz/stores/base.py index 20d59c8..54f2f4f 100644 --- a/asyncz/stores/base.py +++ b/asyncz/stores/base.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, Any, List, Optional from cryptography.hazmat.primitives.ciphers.aead import AESCCM -from loguru import logger from asyncz.state import BaseStateExtra from asyncz.stores.types import StoreType @@ -18,10 +17,9 @@ class BaseStore(BaseStateExtra, StoreType): Base class for all task stores. """ - def __init__(self, scheduler: Optional["SchedulerType"] = None, **kwargs: Any) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - self.scheduler = scheduler - self.logger = logger + self.scheduler: Optional[SchedulerType] = None self.encryption_key: Optional[AESCCM] = None def start(self, scheduler: "SchedulerType", alias: str) -> None: @@ -35,6 +33,7 @@ def start(self, scheduler: "SchedulerType", alias: str) -> None: """ self.scheduler = scheduler self.alias = alias + self.logger_name = f"asyncz.stores.{alias}" encryption_key = os.environ.get("ASYNCZ_STORE_ENCRYPTION_KEY") if encryption_key: # we simply use a hash. This way all kinds of tokens, lengths and co are supported diff --git a/asyncz/stores/mongo.py b/asyncz/stores/mongo.py index f734cbf..33bc068 100644 --- a/asyncz/stores/mongo.py +++ b/asyncz/stores/mongo.py @@ -88,7 +88,9 @@ def get_tasks(self, conditions: DictAny) -> List["TaskType"]: tasks.append(self.rebuild_task(document["state"])) except BaseException: doc_id = document["_id"] - self.logger.exception(f"Unable to restore task '{doc_id}'. Removing it...") + cast("SchedulerType", self.scheduler).loggers[self.logger_name].exception( + f"Unable to restore task '{doc_id}'. Removing it..." + ) failed_task_ids.append(doc_id) if failed_task_ids: diff --git a/asyncz/stores/redis.py b/asyncz/stores/redis.py index 7370727..5f4d3c2 100644 --- a/asyncz/stores/redis.py +++ b/asyncz/stores/redis.py @@ -80,7 +80,9 @@ def rebuild_tasks(self, states: Iterable[Tuple[str, Any]]) -> List["TaskType"]: try: tasks.append(self.rebuild_task(state)) except BaseException: - self.logger.exception(f"Unable to restore task '{task_id}'. Removing it...") + cast("SchedulerType", self.scheduler).loggers[self.logger_name].exception( + f"Unable to restore task '{task_id}'. Removing it..." + ) failed_task_ids.append(task_id) if failed_task_ids: diff --git a/asyncz/stores/sqlalchemy.py b/asyncz/stores/sqlalchemy.py index d373710..255e459 100644 --- a/asyncz/stores/sqlalchemy.py +++ b/asyncz/stores/sqlalchemy.py @@ -95,7 +95,9 @@ def get_tasks(self, conditions: Any = None, limit: int = 0) -> List[TaskType]: tasks.append(self.rebuild_task(row.state)) except Exception: task_id = row.id - self.logger.exception(f"Unable to restore task '{task_id}'. Removing it...") + cast("SchedulerType", self.scheduler).loggers[self.logger_name].exception( + f"Unable to restore task '{task_id}'. Removing it..." + ) failed_task_ids.append(task_id) if failed_task_ids: diff --git a/asyncz/tasks/base.py b/asyncz/tasks/base.py index a354d8d..d22ad27 100644 --- a/asyncz/tasks/base.py +++ b/asyncz/tasks/base.py @@ -1,5 +1,5 @@ import inspect -from datetime import datetime +from datetime import datetime, tzinfo from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Union, cast from uuid import uuid4 @@ -60,7 +60,7 @@ def __init__( super().__init__(id=id, **kwargs) self.update_task(fn=fn, **kwargs) - def get_run_times(self, now: datetime) -> List[datetime]: + def get_run_times(self, timezone: tzinfo, now: datetime) -> List[datetime]: """ Computes the scheduled run times `next_run_time` and `now`, inclusive. """ @@ -69,7 +69,7 @@ def get_run_times(self, now: datetime) -> List[datetime]: assert self.trigger while next_run_time and next_run_time <= now: run_times.append(next_run_time) - next_run_time = self.trigger.get_next_trigger_time(next_run_time, now) + next_run_time = self.trigger.get_next_trigger_time(timezone, next_run_time, now) return run_times def update_task( # type: ignore @@ -158,6 +158,8 @@ def update_task( # type: ignore if "store_alias" in updates: store_alias = updates.pop("store_alias") + if not isinstance(store_alias, str): + raise TypeError("store_alias must be a string.") approved["store_alias"] = store_alias if "max_instances" in updates: @@ -206,11 +208,9 @@ def __setstate__(self, state: "TaskState") -> "Task": # type: ignore for name, value in self.__dict__.items(): if name == "fn": self.__dict__[name] = ref_to_obj(value) - - for name, value in state.__private_attributes__.items(): - if name == "fn": - value = ref_to_obj(value) - object_setattr(self, name, value) + # the task was serialized in a store, so it is active + self.submitted = True + self.pending = False return self def __getstate__(self) -> "TaskState": # type: ignore @@ -272,8 +272,13 @@ def __str__(self) -> str: def __call__(self, fn: DecoratedFn) -> DecoratedFn: new_dict: Dict[str, Any] = dict(self.__dict__) new_dict.pop("pending", None) + new_dict.pop("submitted", None) new_dict.pop("fn_reference", None) new_dict["fn"] = fn + if new_dict["store_alias"] is None: + del new_dict["store_alias"] + if new_dict["executor"] is None: + del new_dict["executor"] replace_existing = True if not new_dict.get("id"): replace_existing = False @@ -282,6 +287,7 @@ def __call__(self, fn: DecoratedFn) -> DecoratedFn: task = self.__class__(**new_dict) scheduler = self.scheduler if scheduler is not None: + # in decorator mode tasks are simply started scheduler.add_task(task, replace_existing=replace_existing) if not replace_existing: if not hasattr(fn, "asyncz_tasks"): diff --git a/asyncz/tasks/types.py b/asyncz/tasks/types.py index cbf6b7b..2b888dd 100644 --- a/asyncz/tasks/types.py +++ b/asyncz/tasks/types.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from datetime import datetime +from datetime import datetime, tzinfo from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar from asyncz.schedulers.types import SchedulerType @@ -20,16 +20,19 @@ class TaskType(TaskDefaultsType, ABC): """BaseType of task.""" id: Optional[str] = None - pending: bool = True name: Optional[str] = None next_run_time: Optional[datetime] = None - store_alias: Optional[str] = None - scheduler: Optional[SchedulerType] = None - trigger: Optional[TriggerType] = None - executor: Optional[str] = None fn: Optional[Callable[..., Any]] = None args: Sequence[Any] kwargs: Dict[str, Any] + # are set by add_task if not set + store_alias: Optional[str] = None + executor: Optional[str] = None + trigger: Optional[TriggerType] = None + scheduler: Optional[SchedulerType] = None + # are exclusively set by scheduler + pending: bool = True + submitted: bool = False @abstractmethod def update_task(self, **updates: Any) -> TaskType: @@ -92,7 +95,7 @@ def delete(self) -> TaskType: return self @abstractmethod - def get_run_times(self, now: datetime) -> List[datetime]: + def get_run_times(self, timezone: tzinfo, now: datetime) -> List[datetime]: """ Computes the scheduled run times `next_run_time` and `now`, inclusive. """ diff --git a/asyncz/triggers/combination.py b/asyncz/triggers/combination.py index 65cf6ae..7fa2244 100644 --- a/asyncz/triggers/combination.py +++ b/asyncz/triggers/combination.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, tzinfo from typing import ClassVar, Optional, Union from asyncz.triggers.base import BaseCombinationTrigger @@ -17,14 +17,14 @@ class AndTrigger(BaseCombinationTrigger): alias: ClassVar[str] = "and" def get_next_trigger_time( - self, previous_time: Optional[datetime], now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None ) -> Union[datetime, None]: if now is None: - now = datetime.now() + now = datetime.now(timezone) while True: trigger_times = [] for trigger in self.triggers: - next_trigger_time = trigger.get_next_trigger_time(previous_time, now) + next_trigger_time = trigger.get_next_trigger_time(timezone, previous_time, now) # bail out early if next_trigger_time is None: return None @@ -32,6 +32,7 @@ def get_next_trigger_time( if min(trigger_times) == max(trigger_times): return self.apply_jitter(trigger_times[0], self.jitter, now) else: + # recheck now = max(trigger_times) @@ -48,13 +49,13 @@ class OrTrigger(BaseCombinationTrigger): alias: ClassVar[str] = "or" def get_next_trigger_time( - self, previous_time: Optional[datetime], now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None ) -> Union[datetime, None]: if now is None: - now = datetime.now() + now = datetime.now(timezone) trigger_times = [] for trigger in self.triggers: - next_trigger_time = trigger.get_next_trigger_time(previous_time, now) + next_trigger_time = trigger.get_next_trigger_time(timezone, previous_time, now) if next_trigger_time is not None: trigger_times.append(next_trigger_time) diff --git a/asyncz/triggers/cron/trigger.py b/asyncz/triggers/cron/trigger.py index 0f7e7d1..d6bfdb9 100644 --- a/asyncz/triggers/cron/trigger.py +++ b/asyncz/triggers/cron/trigger.py @@ -1,8 +1,6 @@ from datetime import datetime, timedelta, tzinfo from typing import Any, ClassVar, Optional, Tuple, Union -from tzlocal import get_localzone - from asyncz.datastructures import CronState from asyncz.triggers.base import BaseTrigger from asyncz.triggers.cron.constants import DEFAULT_VALUES @@ -56,6 +54,7 @@ class CronTrigger(BaseTrigger): """ alias: ClassVar[str] = "cron" + timezone: Optional[tzinfo] = None def __init__( self, @@ -111,10 +110,10 @@ def __init__( elif isinstance(end_at, datetime) and end_at.tzinfo: self.timezone = end_at.tzinfo else: - self.timezone = get_localzone() + self.timezone = None - self.start_at = to_datetime(start_at, self.timezone, "start_at") - self.end_at = to_datetime(end_at, self.timezone, "end_at") + self.start_at = to_datetime(start_at, self.timezone, "start_at", require_tz=False) + self.end_at = to_datetime(end_at, self.timezone, "end_at", require_tz=False) self.jitter = jitter values = { @@ -206,7 +205,9 @@ def increment_field_value(self, dateval: datetime, field_number: int) -> Tuple[d difference = datetime(**values) - dateval.replace(tzinfo=None) return normalize(dateval + difference), field_number - def set_field_value(self, dateval: datetime, field_number: int, new_value: Any) -> datetime: + def set_field_value( + self, dateval: datetime, field_number: int, new_value: Any, timezone: tzinfo + ) -> datetime: values = {} for i, field in enumerate(self.fields): if field.real: @@ -217,22 +218,34 @@ def set_field_value(self, dateval: datetime, field_number: int, new_value: Any) else: values[field.name] = new_value - return localize(datetime(**values), self.timezone) + return localize(datetime(**values), timezone) def get_next_trigger_time( - self, previous_time: Optional[datetime], now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None ) -> Union[datetime, None]: if now is None: - now = datetime.now() + now = datetime.now(timezone) + start_at_field = ( + self.start_at.replace(tzinfo=timezone) + if self.start_at and self.start_at.tzinfo is None + else self.start_at + ) + end_at_field = ( + self.end_at.replace(tzinfo=timezone) + if self.end_at and self.end_at.tzinfo is None + else self.end_at + ) if previous_time: start_at = min(now, previous_time + timedelta(microseconds=1)) if start_at == previous_time: start_at += timedelta(microseconds=1) else: - start_at = max(now, self.start_at) if self.start_at else now + start_at = max(now, start_at_field) if start_at_field else now + if start_at.tzinfo != timezone: + start_at = start_at.astimezone(timezone) fieldnum = 0 - next_date = datetime_ceil(start_at).astimezone(self.timezone) + next_date = datetime_ceil(start_at) while 0 <= fieldnum < len(self.fields): field = self.fields[fieldnum] curr_value = field.get_value(next_date) @@ -242,19 +255,19 @@ def get_next_trigger_time( next_date, fieldnum = self.increment_field_value(next_date, fieldnum - 1) elif next_value > curr_value: if field.real: - next_date = self.set_field_value(next_date, fieldnum, next_value) + next_date = self.set_field_value(next_date, fieldnum, next_value, timezone) fieldnum += 1 else: next_date, fieldnum = self.increment_field_value(next_date, fieldnum) else: fieldnum += 1 - if self.end_at and next_date > self.end_at: + if end_at_field and next_date > end_at_field: return None if fieldnum >= 0: next_date = self.apply_jitter(next_date, self.jitter, now) - return min(next_date, self.end_at) if self.end_at else next_date + return min(next_date, end_at_field) if end_at_field else next_date return None def __getstate__(self) -> Any: diff --git a/asyncz/triggers/date.py b/asyncz/triggers/date.py index 871bc80..faf599d 100644 --- a/asyncz/triggers/date.py +++ b/asyncz/triggers/date.py @@ -1,8 +1,6 @@ from datetime import datetime, tzinfo from typing import Any, ClassVar, Optional, Union -from tzlocal import get_localzone - from asyncz.datastructures import DateState from asyncz.triggers.base import BaseTrigger from asyncz.utils import datetime_repr, to_datetime, to_timezone @@ -26,7 +24,7 @@ def __init__( timezone: Optional[Union[tzinfo, str]] = None, **kwargs: Any, ): - timezone = to_timezone(timezone) or get_localzone() + timezone = to_timezone(timezone) if run_at is not None: kwargs["run_at"] = to_datetime(run_at, timezone, "run_at") else: @@ -35,10 +33,10 @@ def __init__( super().__init__(**kwargs) def get_next_trigger_time( - self, previous_time: Optional[datetime], now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None ) -> Union[datetime, None]: if previous_time is None: - return self.run_at + return self.run_at.astimezone(timezone) return None def __getstate__(self) -> Any: diff --git a/asyncz/triggers/interval.py b/asyncz/triggers/interval.py index cd5e479..9095b94 100644 --- a/asyncz/triggers/interval.py +++ b/asyncz/triggers/interval.py @@ -2,8 +2,6 @@ from math import ceil from typing import Any, ClassVar, Optional, Union -from tzlocal import get_localzone - from asyncz.datastructures import IntervalState from asyncz.triggers.base import BaseTrigger from asyncz.utils import datetime_repr, normalize, timedelta_seconds, to_datetime, to_timezone @@ -26,6 +24,7 @@ class IntervalTrigger(BaseTrigger): """ alias: ClassVar[str] = "interval" + timezone: Optional[tzinfo] = None def __init__( self, @@ -57,33 +56,46 @@ def __init__( elif isinstance(end_at, datetime) and end_at.tzinfo: self.timezone = end_at.tzinfo else: - self.timezone = get_localzone() + self.timezone = None start_at = start_at or (datetime.now(self.timezone) + self.interval) - self.start_at = to_datetime(start_at, self.timezone, "start_at") - self.end_at = to_datetime(end_at, self.timezone, "end_at") + self.start_at = to_datetime(start_at, self.timezone, "start_at", require_tz=False) + self.end_at = to_datetime(end_at, self.timezone, "end_at", require_tz=False) def get_next_trigger_time( - self, previous_time: Optional[datetime], now: Union[datetime, None] = None + self, + timezone: tzinfo, + previous_time: Optional[datetime], + now: Union[datetime, None] = None, ) -> Union[datetime, None]: if now is None: - now = datetime.now() + now = datetime.now(timezone) next_trigger_time: Optional[datetime] + start_at = ( + self.start_at.replace(tzinfo=timezone) + if self.start_at and self.start_at.tzinfo is None + else self.start_at + ) if previous_time: next_trigger_time = previous_time + self.interval - elif self.start_at > now: - next_trigger_time = self.start_at + elif start_at > now: + next_trigger_time = start_at else: - time_difference_seconds = timedelta_seconds(now - self.start_at) + time_difference_seconds = timedelta_seconds(now - start_at) next_interval_number = int(ceil(time_difference_seconds / self.interval_size)) - next_trigger_time = self.start_at + self.interval * next_interval_number + next_trigger_time = start_at + self.interval * next_interval_number if self.jitter is not None: next_trigger_time = self.apply_jitter( next_trigger_time=next_trigger_time, jitter=self.jitter, now=now ) + end_at = ( + self.end_at.replace(tzinfo=timezone) + if self.end_at and self.end_at.tzinfo is None + else self.end_at + ) - if not self.end_at or next_trigger_time <= self.end_at: + if not end_at or next_trigger_time <= end_at: return normalize(value=next_trigger_time) return None diff --git a/asyncz/triggers/types.py b/asyncz/triggers/types.py index 048804a..470da94 100644 --- a/asyncz/triggers/types.py +++ b/asyncz/triggers/types.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from datetime import datetime +from datetime import datetime, tzinfo from typing import ClassVar, Optional, Union, overload @@ -10,7 +10,7 @@ class TriggerType(ABC): @abstractmethod def get_next_trigger_time( - self, previous_time: Optional[datetime], now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: Optional[datetime], now: Optional[datetime] = None ) -> Union[datetime, None]: """ Returns the next datetime to trigger. If the datetime cannot be calculated, then returns None. diff --git a/asyncz/typing.py b/asyncz/typing.py index 81cee57..96cb94d 100644 --- a/asyncz/typing.py +++ b/asyncz/typing.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Type, TypeVar, Union +from typing import Any, Dict class Undefined: @@ -17,8 +17,8 @@ def __repr__(self) -> str: undefined = Undefined() -UndefinedTypeVar = TypeVar("UndefinedTypeVar", bound=Undefined) -UndefinedType = Union[Type[UndefinedTypeVar], Undefined] +# legacy shim +UndefinedType = Undefined DictAny = Dict[Any, Any] DictStrAny = Dict[str, Any] diff --git a/asyncz/utils.py b/asyncz/utils.py index 0c7cd00..fab1394 100644 --- a/asyncz/utils.py +++ b/asyncz/utils.py @@ -1,7 +1,6 @@ import inspect import re from asyncio import iscoroutinefunction -from calendar import timegm from datetime import date, datetime, time, timedelta, tzinfo from datetime import timezone as dttz from functools import partial @@ -10,18 +9,21 @@ from asyncz.exceptions import AsynczException, AsynczLookupError try: - from threading import TIMEOUT_MAX + from tzlocal import get_localzone as _get_localzone + + def get_localzone() -> tzinfo: + return _get_localzone() except ImportError: - TIMEOUT_MAX = 4294967 + + def get_localzone() -> tzinfo: + return dttz.utc + + try: from zoneinfo import ZoneInfo # type: ignore[import-not-found,unused-ignore] except ImportError: from backports.zoneinfo import ZoneInfo # type: ignore[no-redef,unused-ignore] -BOOL_VALIDATION = { - "true": ["true", "yes", "on", "y", "t", "1", True], - "false": ["false", "no", "off", "f", "0", False], -} DATE_REGEX = re.compile( r"(?P\d{4})-(?P\d{1,2})-(?P\d{1,2})" @@ -65,19 +67,6 @@ def to_float(value: Union[str, int, float, None]) -> Optional[float]: return None -def to_bool(value: Union[str, bool, None]) -> bool: - """ - Converts the given value into a boolean. - """ - if isinstance(value, str): - value = value.strip().lower() - if value in BOOL_VALIDATION["true"]: - return True - elif value in BOOL_VALIDATION["false"]: - return False - return False - - @overload def to_timezone(value: None) -> None: ... @@ -91,7 +80,8 @@ def to_timezone(value: Any) -> Optional[tzinfo]: Converts a value to timezone object. """ if isinstance(value, str): - return ZoneInfo(value) + # python 3.8 typing issue + return cast(tzinfo, ZoneInfo(value)) if isinstance(value, tzinfo): return value if value is not None: @@ -99,18 +89,33 @@ def to_timezone(value: Any) -> Optional[tzinfo]: return None +def to_timezone_with_fallback(value: Any = None) -> tzinfo: + timezone: Optional[tzinfo] = to_timezone(value) + if timezone is None: + timezone = get_localzone() + return timezone + + @overload -def to_datetime(value: None, tz: Union[tzinfo, str], arg_name: str) -> None: ... +def to_datetime( + value: None, tz: Union[tzinfo, str, None], arg_name: str, require_tz: bool = True +) -> None: ... @overload def to_datetime( - value: Union[str, datetime, date], tz: Union[tzinfo, str], arg_name: str + value: Union[str, datetime, date], + tz: Union[tzinfo, str, None], + arg_name: str, + require_tz: bool = True, ) -> Union[datetime, Any]: ... def to_datetime( - value: Union[str, datetime, date, None], tz: Union[tzinfo, str], arg_name: str + value: Union[str, datetime, date, None], + tz: Union[tzinfo, str, None], + arg_name: str, + require_tz: bool = True, ) -> Union[datetime, None, Any]: """ Converts the given value to a timezone compatible aware datetime object. @@ -153,12 +158,15 @@ def to_datetime( return _datetime if tz is None: + if not require_tz: + return _datetime raise AsynczException( detail=f'The "tz" argument must be specified if {arg_name} has no timezone information' ) if isinstance(tz, str): - tz = ZoneInfo(tz) + # python 3.8 typing issue + tz = cast(tzinfo, ZoneInfo(tz)) return localize(_datetime, tz) @@ -176,7 +184,7 @@ def datetime_to_utc_timestamp(timeval: Optional[datetime]) -> Optional[float]: Converts a datetime instance to a timestamp. """ if timeval is not None: - return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000 + return timeval.timestamp() return None @@ -403,11 +411,13 @@ def iscoroutinefunction_partial(f: Any) -> bool: def normalize(value: datetime) -> datetime: + # applies dst change return datetime.fromtimestamp(value.timestamp(), value.tzinfo) def localize(value: datetime, tzinfo: tzinfo) -> datetime: if hasattr(tzinfo, "localize"): + # pytz localize return cast(datetime, tzinfo.localize(value)) return normalize(value.replace(tzinfo=tzinfo)) diff --git a/docker-compose.yml b/docker-compose.yml index 100fa09..6dd7e35 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: "2" +version: "3" services: redis: image: redis diff --git a/docs/release-notes.md b/docs/release-notes.md index d675e7c..7d3abf3 100644 --- a/docs/release-notes.md +++ b/docs/release-notes.md @@ -1,5 +1,27 @@ # Release Notes +## 0.11.0 + +### Added + +- Allow submitting paused tasks. +- Allow changing in-place attributes of tasks when submitting with `add_task`. +- Allow selecting logger (classical, loguru). +- Allow naming schedulers with an extra logger name. + +### Fixed + +- `remove_all_tasks` didn't check the store of tasks when pending_tasks was used (stopped scheduler). + +### Changed + +- Replace `UndefinedType` with `Undefined`. Shim provided for backwards compatibility. +- `add_task` has now more arguments with undefined as default. +- `pending_tasks` has now no more store alias in it. +- `tzlocal` is now optional. +- Tasks use the timezone of the scheduler for their triggers which require a timezone. +- `loguru` is now optional. + ## 0.10.1 ### Added diff --git a/docs/schedulers.md b/docs/schedulers.md index 80d69d1..a777b4e 100644 --- a/docs/schedulers.md +++ b/docs/schedulers.md @@ -85,12 +85,31 @@ instantiation. #### Third option -The third option is by starting the scheduler and use the `configure` method. +The third option is by starting the scheduler and use the `setup` method. ```python {!> ../docs_src/schedulers/method3.py !} ``` +## Changing logger name and class + +`asyncz` uses a custom way of logging: it builds up a dictionary store with loggers of the standard logger interface. +They are retrieved from schedulers via their alias name plus prefix. + +e.g. `asyncz.schedulers`, `asyncz.stores.default`, `asyncz.executors.default` + +Scheduler has an optional parameter named `logger_name`. If set the the schedulers logger becomes: + +`asyncz.schedulers.` + +By default `asyncz` uses loguru as logger (when available) and falls back to classical logging. + +If this is not wished there are some methods: + +- setting either via global config or direct the value of `loggers_class` to `asyncz.schedulers.base:ClassicLogging` (or the class itself instead of the string) when creating a scheduler object +- setting `asyncz.schedulers.base.default_loggers_class` to ClassicLogging (same file, only class is possible here) + + ## Starting and stopping the scheduler Every scheduler inherits from the [BaseScheduler](#basescheduler) and therefore implement the @@ -145,6 +164,30 @@ and simpler. When adding tasks there is not a specific order. **You can add tasks at any given time**. If the scheduler is not yet running, once it does it will add the tasks to it. + +### Parameters + +* **fn_or_task** - (positional or via this name). The callable function to execute or the task to submit. +* **id** - The unique identifier of this task. Leave empty to autogenerate an id or switch to the +* **name** - The description of this task. +* **args** - Positional arguments to the callable. +* **kwargs** - Keyword arguments to the callable. +* **coalesce** - Whether to only run the task once when several run times are due. +* **trigger** - The trigger object that controls the schedule of this task. +* **executor** - The name of the executor that will run this task. +* **mistrigger_grace_time** - The time (in seconds) how much this task's execution is allowed to +be late (None means "allow the task to run no matter how late it is"). +* **max_instances** - The maximum number of concurrently executing instances allowed for this task. +* **next_run_time** - The next scheduled run time of this task. If set to None, the task start paused. +* **replace_existing** - The submitted task replaces an existing task with the same id. Otherwise a `ConflictId` error is thrown. + +!!! Note + `add_task` has a special pause mode: `next_run_time` can be set to None for starting a Task paused. This works also with Task objects. + +!!! Tip + When submitting a Task object, most attributes can be changed by providing arguments for e.g. trigger, name and other kwargs. However the task is updated in-place. No copy is made. + This has interesting effects: for example a decorator mode Task can be turned in a normal one by providing an id and is submitted in-place. + ## Add tasks as decorator When leaving out the `fn` parameter, you get back a decorator mode Task. diff --git a/docs/tasks.md b/docs/tasks.md index 8c8b7d5..9f679eb 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -22,7 +22,8 @@ from asyncz.tasks import Task * **mistrigger_grace_time** - The time (in seconds) how much this task's execution is allowed to be late (None means "allow the task to run no matter how late it is"). * **max_instances** - The maximum number of concurrently executing instances allowed for this task. -* **next_run_time** - The next scheduled run time of this task. +* **next_run_time** - The next scheduled run time of this task. (Note: this has not the pause mode effect like in add_task) + ## Create a task diff --git a/docs_src/schedulers/add_task.py b/docs_src/schedulers/add_task.py index 1e15221..8839cab 100644 --- a/docs_src/schedulers/add_task.py +++ b/docs_src/schedulers/add_task.py @@ -26,13 +26,13 @@ def check_status(): # Create the tasks # Run every Monday, Wednesday and Friday scheduler.add_task( - fn=send_email_newsletter, + send_email_newsletter, trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"), ) # Run every 2 minutes scheduler.add_task( - fn=collect_www_info, + collect_www_info, trigger=IntervalTrigger(minutes=2), max_instances=1, replace_existing=True, @@ -41,7 +41,7 @@ def check_status(): # Run every 10 minutes scheduler.add_task( - fn=check_status, + check_status, trigger=IntervalTrigger(minutes=10), max_instances=1, replace_existing=True, @@ -52,7 +52,7 @@ def check_status(): feed_data = collect_www_info() scheduler.add_task( - fn=send_email_newsletter, + fn_or_task=send_email_newsletter, args=[feed_data], trigger=IntervalTrigger(minutes=10), max_instances=1, @@ -70,6 +70,7 @@ def check_status(): replace_existing=True, coalesce=False, ) +# you can update most attributes here. Note: a task can be only submitted once scheduler.add_task(task) # Use Task as decorator (leave fn empty) @@ -85,3 +86,15 @@ def check_status(): # Start the scheduler scheduler.start() + +# Add paused Task +scheduler.add_task( + send_email_newsletter, + args=[feed_data], + trigger=IntervalTrigger(minutes=10), + max_instances=1, + replace_existing=True, + coalesce=False, + # this pauses the task on submit + next_run_time=None, +) diff --git a/docs_src/schedulers/method1.py b/docs_src/schedulers/method1.py index 9d7e46f..b914e10 100644 --- a/docs_src/schedulers/method1.py +++ b/docs_src/schedulers/method1.py @@ -19,7 +19,25 @@ # Set the defaults task_defaults = {"coalesce": False, "max_instances": 4} -# Start the scheduler +# Create the scheduler scheduler = AsyncIOScheduler( stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc ) + +# Start the scheduler +with scheduler: + # note: you can also use start() and shutdown() manually + # Nesting is also not a problem (start and shutdown are refcounted and only the outermost scope does start and shutdown the scheduler) + + scheduler.start() + + scheduler.stop() + +# manually you have more control like: +scheduler.start(paused=True) +scheduler.resume() + +# noop because not outermost scope +with scheduler: + ... +scheduler.shutdown(wait=False) diff --git a/docs_src/schedulers/method2.py b/docs_src/schedulers/method2.py index b7d106e..5234805 100644 --- a/docs_src/schedulers/method2.py +++ b/docs_src/schedulers/method2.py @@ -1,6 +1,6 @@ from asyncz.schedulers.asyncio import AsyncIOScheduler -# Start the scheduler +# Create the scheduler scheduler = AsyncIOScheduler( global_config={ "asyncz.stores.mongo": {"type": "mongodb"}, @@ -15,3 +15,8 @@ "asyncz.task_defaults.timezone": "UTC", }, ) + +# Start the scheduler +with scheduler: + ... + # your code diff --git a/docs_src/schedulers/method3.py b/docs_src/schedulers/method3.py index 1bb4e29..3cbdcda 100644 --- a/docs_src/schedulers/method3.py +++ b/docs_src/schedulers/method3.py @@ -18,7 +18,7 @@ # Set the defaults task_defaults = {"coalesce": False, "max_instances": 4} -# Start the scheduler +# Create the scheduler scheduler = AsyncIOScheduler() ## Add some tasks here or anything else (for instance 3 tasks) @@ -27,3 +27,8 @@ scheduler.add_task(...) scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc) + +# Start the scheduler +with scheduler: + ... + # your code diff --git a/docs_src/tasks/create_task.py b/docs_src/tasks/create_task.py index 1873eba..6948de5 100644 --- a/docs_src/tasks/create_task.py +++ b/docs_src/tasks/create_task.py @@ -22,3 +22,16 @@ def check_status(): ) scheduler.start() + +# or manually submit one with a changed trigger + +t = Task( + id="my-task2", + fn=check_status, + max_instances=1, + coalesce=True, +) + +scheduler.add_task( + t, trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1) +) diff --git a/docs_src/triggers/custom.py b/docs_src/triggers/custom.py index bbb1f79..9409c69 100644 --- a/docs_src/triggers/custom.py +++ b/docs_src/triggers/custom.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, tzinfo from typing import Optional, Union from loguru import logger @@ -11,7 +11,7 @@ class CustomTrigger(BaseTrigger): alias: str = "custom" def get_next_trigger_time( - self, previous_time: datetime, now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: datetime, now: Optional[datetime] = None ) -> Union[datetime, None]: # Add logic for the next trigger time of the custom trigger ... diff --git a/pyproject.toml b/pyproject.toml index 4590eb6..146774d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,9 +40,7 @@ classifiers = [ ] dependencies = [ "cryptography", - "loguru>=0.7.0,<0.8.0", "pydantic>=2.5.3,<3.0.0", - "tzlocal>=4.0,<6.0", 'backports.zoneinfo;python_version<"3.9"', ] keywords = [ @@ -70,7 +68,10 @@ Source = "https://github.com/dymmond/asyncz" dependencies = ["pre-commit>=2.17.0,<3.0.0", "ipdb", "types-tzlocal"] [project.optional-dependencies] +localtime=["tzlocal"] +loguru=["loguru>=0.7.0,<0.8.0"] testing = [ + "asyncz[localtime,loguru]", "pymongo>=4.3.3,<5.0.0", "pytest>=7.1.3,<9.0.0", "pytest-cov >=2.12.0,<6.0.0", diff --git a/tests/conftest.py b/tests/conftest.py index 9fb0d22..1c62024 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,14 +5,14 @@ import pytz from asyncz.schedulers.asyncio import AsyncIOScheduler -from asyncz.schedulers.base import BaseScheduler +from asyncz.schedulers.base import BaseScheduler, LoguruLogging from asyncz.tasks import Task @pytest.fixture(scope="function") def timezone(monkeypatch): tz = pytz.timezone("Europe/London") - monkeypatch.setattr("asyncz.schedulers.base.get_localzone", Mock(return_value=tz)) + monkeypatch.setattr("asyncz.utils.get_localzone", Mock(return_value=tz)) return tz @@ -69,7 +69,8 @@ def task_defaults(timezone): @pytest.fixture(scope="function") def create_task(task_defaults, timezone): def create(**kwargs): - kwargs.setdefault("scheduler", Mock(BaseScheduler, timezone=timezone)) + mock_scheduler = Mock(BaseScheduler, timezone=timezone, loggers=LoguruLogging()) + kwargs.setdefault("scheduler", mock_scheduler) task_kwargs = task_defaults.copy() task_kwargs.update(kwargs) task_kwargs["trigger"] = AsyncIOScheduler().create_trigger( diff --git a/tests/test_executors.py b/tests/test_executors.py index 299913c..09c5c8f 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -14,7 +14,7 @@ from asyncz.executors.asyncio import AsyncIOExecutor from asyncz.executors.base import run_coroutine_task, run_task from asyncz.schedulers.asyncio import AsyncIOScheduler -from asyncz.schedulers.base import BaseScheduler +from asyncz.schedulers.base import BaseScheduler, LoguruLogging from asyncz.tasks import Task @@ -22,6 +22,7 @@ def scheduler_mocked(timezone): scheduler_ = Mock(BaseScheduler, timezone=timezone) scheduler_.create_lock = MagicMock() + scheduler_.loggers = LoguruLogging() return scheduler_ @@ -32,7 +33,7 @@ def executor(request, scheduler_mocked): executor = ThreadPoolExecutor() else: - from asyncz.executors.pool import ProcessPoolExecutor + from asyncz.executors.process_pool import ProcessPoolExecutor executor = ProcessPoolExecutor() @@ -196,7 +197,6 @@ async def fn(): raise Exception("dummy") fake_task = Mock(Task, id="dummy", fn=fn, args=(), kwargs={}, mistrigger_grace_time=1) - with patch("loguru.logger"): for _ in range(5): await run_coroutine_task(fake_task, "foo", [datetime.now(pytz.utc)], logger) diff --git a/tests/test_integrations.py b/tests/test_integrations.py index a19dad3..bb17d6d 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -1,4 +1,5 @@ import contextlib +import time import pytest from esmerald import Gateway, Request @@ -16,6 +17,7 @@ from starlette.testclient import TestClient from asyncz.schedulers.asyncio import AsyncIOScheduler +from asyncz.schedulers.base import ClassicLogging, LoguruLogging def get_starlette_app(): @@ -154,6 +156,14 @@ async def list_notes(request: Request) -> EsmeraldJSONResponse: return app, scheduler +@pytest.mark.parametrize( + "loggers_class_string,loggers_class", + [ + ["asyncz.schedulers.base:ClassicLogging", ClassicLogging], + ["asyncz.schedulers.base:LoguruLogging", LoguruLogging], + ], + ids=["ClassicLogging", "LoguruLogging"], +) @pytest.mark.parametrize( "get_app", [ @@ -166,8 +176,10 @@ async def list_notes(request: Request) -> EsmeraldJSONResponse: get_esmerald_app2, ], ) -def test_integrations(get_app): +def test_integrations(get_app, loggers_class_string, loggers_class): app, scheduler = get_app() + scheduler.setup(loggers_class=loggers_class_string) + assert isinstance(scheduler.loggers, loggers_class) dummy_job_called = 0 async_dummy_job_called = 0 @@ -198,6 +210,9 @@ async def async_dummy_job(): response = client.get("/notes") assert response.status_code == 200 assert response.json() == [] + # fix CancelledError, by giving scheduler more time to send the tasks to the pool + # if the pool is closed, newly submitted tasks are cancelled + time.sleep(0.01) assert not scheduler.running assert dummy_job_called == 2 diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index fec53bb..4c14de6 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -1,6 +1,6 @@ import logging import pickle -from datetime import datetime, timedelta +from datetime import datetime, timedelta, tzinfo from threading import Thread from typing import Any, List, Optional, Union from unittest.mock import MagicMock, patch @@ -36,7 +36,7 @@ ) from asyncz.executors.base import BaseExecutor from asyncz.executors.debug import DebugExecutor -from asyncz.schedulers.base import BaseScheduler +from asyncz.schedulers.base import BaseScheduler, ClassicLogging from asyncz.stores.base import BaseStore from asyncz.stores.memory import MemoryStore from asyncz.tasks import Task @@ -68,7 +68,7 @@ def __init__(self, **args): self.args = args def get_next_trigger_time( - self, previous_time: datetime, now: Optional[datetime] = None + self, timezone: tzinfo, previous_time: datetime, now: Optional[datetime] = None ) -> Union[datetime, None]: ... @@ -205,6 +205,57 @@ def test_configure(self, scheduler, global_config): } ) + def test_scheduler_change_logger_direct_class(self): + """ + Test scheduler is picking up the right loggers class + """ + scheduler = DummyScheduler(loggers_class=ClassicLogging) + scheduler.start(paused=True) + assert isinstance(scheduler.loggers, ClassicLogging) + assert scheduler.logger_name == "asyncz.schedulers" + + def test_scheduler_change_logger_class_path(self): + """ + Test scheduler is picking up the right loggers class + """ + scheduler = DummyScheduler(loggers_class="asyncz.schedulers.base:ClassicLogging") + scheduler.start(paused=True) + assert isinstance(scheduler.loggers, ClassicLogging) + assert scheduler.logger_name == "asyncz.schedulers" + + def test_scheduler_change_logger_config(self): + """ + Test scheduler is picking up the right loggers class + """ + scheduler = DummyScheduler( + global_config={"asyncz.loggers_class": "asyncz.schedulers.base:ClassicLogging"} + ) + scheduler.start(paused=True) + assert isinstance(scheduler.loggers, ClassicLogging) + assert scheduler.logger_name == "asyncz.schedulers" + + def test_scheduler_change_logger_name(self): + """ + Test scheduler is picking up the right logger name for loggers + """ + scheduler = DummyScheduler( + global_config={"asyncz.loggers_class": "asyncz.schedulers.base:ClassicLogging"}, + logger_name="test", + ) + scheduler.start(paused=True) + assert isinstance(scheduler.loggers, ClassicLogging) + assert scheduler.logger_name == "asyncz.schedulers.test" + + @patch("asyncz.schedulers.base.default_loggers_class", side_effect=ClassicLogging) + def test_scheduler_change_logger_change_default(self, patched): + """ + Test scheduler is picking up the right loggers class + """ + scheduler = DummyScheduler() + scheduler.start(paused=True) + assert isinstance(scheduler.loggers, ClassicLogging) + assert scheduler.logger_name == "asyncz.schedulers" + @pytest.mark.parametrize("method", [BaseScheduler.setup, BaseScheduler.start]) def test_scheduler_already_running(self, method, scheduler): """ @@ -249,7 +300,7 @@ def test_start(self, real_add_task, dispatch_events, scheduler, create_task): "store2": MagicMock(BaseStore), } task = create_task(fn=lambda: None) - scheduler.pending_tasks = [(task, "store1", False)] + scheduler.pending_tasks = [(task, False, True)] scheduler.start() scheduler.executors["exec1"].start.assert_called_once_with(scheduler, "exec1") @@ -262,7 +313,7 @@ def test_start(self, real_add_task, dispatch_events, scheduler, create_task): assert "default" in scheduler.executors assert "default" in scheduler.stores - scheduler.real_add_task.assert_called_once_with(task, "store1", False) + scheduler.real_add_task.assert_called_once_with(task, False, True) assert scheduler.pending_tasks == [] assert scheduler.dispatch_event.call_count == 3 @@ -434,10 +485,36 @@ def test_add_task_obj_return_value(self, scheduler, timezone): args=[1], kwargs={"y": 2}, ) + assert task.scheduler is None task = scheduler.add_task(task, trigger="date", run_at="2020-06-01 08:41:00") assert isinstance(task, Task) assert task.id == "my-id" + assert task.trigger is not None + assert task.scheduler is not None + + assert task.mistrigger_grace_time == 1 + assert task.coalesce is True + assert task.max_instances == 1 + assert task.submitted + # test that submitting works only once + with pytest.raises(AssertionError): + scheduler.add_task(task) + + def test_add_task_obj_paused_update(self, scheduler, timezone): + """Test that when a task is added to a stopped scheduler, a Task instance is returned.""" + task = Task( + fn=lambda x, y: None, + id="my-id", + name="dummy", + args=[1], + kwargs={"y": 2}, + ) + task = scheduler.add_task(task, trigger="date", run_at="2020-06-01 08:41:00") + + assert isinstance(task, Task) + assert task.id == "my-id" + assert task.trigger assert task.mistrigger_grace_time == 1 assert task.coalesce is True @@ -464,6 +541,25 @@ def test_add_task_pending(self, scheduler, scheduler_events): assert not task.coalesce assert task.max_instances == 6 + def test_add_paused_task(self, scheduler, scheduler_events): + scheduler.setup( + task_defaults={ + "mistrigger_grace_time": 3, + "coalesce": False, + "max_instances": 6, + } + ) + task = scheduler.add_task(lambda: None, trigger="interval", hours=1, next_run_time=None) + assert not scheduler_events + + scheduler.start() + + assert len(scheduler_events) == 3 + assert scheduler_events[2].code == TASK_ADDED + assert scheduler_events[2].task_id is task.id + + assert task.next_run_time is None + def test_add_task_id_conflict(self, scheduler): scheduler.start(paused=True) scheduler.add_task(lambda: None, "interval", id="testtask", seconds=1) @@ -567,7 +663,7 @@ def test_update_task(self, scheduler, pending, timezone): def test_reschedule_task(self, scheduler): object_setter(scheduler, "update_task", MagicMock()) - trigger = MagicMock(get_next_trigger_time=lambda previous, now: 1) + trigger = MagicMock(get_next_trigger_time=lambda timezone, previous, now: 1) object_setter(scheduler, "create_trigger", MagicMock(return_value=trigger)) scheduler.reschedule_task("my-id", "store", "date", run_at="2022-06-01 08:41:00") @@ -587,7 +683,9 @@ def test_pause_task(self, scheduler): @pytest.mark.parametrize("dead_task", [True, False], ids=["dead task", "live task"]) def test_resume_task(self, scheduler, freeze_time, dead_task): next_trigger_time = None if dead_task else freeze_time.current + timedelta(seconds=1) - trigger = MagicMock(BaseTrigger, get_next_trigger_time=lambda prev, now: next_trigger_time) + trigger = MagicMock( + BaseTrigger, get_next_trigger_time=lambda timezone, prev, now: next_trigger_time + ) returned_task = MagicMock(Task, id="foo", trigger=trigger) object_setter(scheduler, "lookup_task", MagicMock(return_value=(returned_task, "bar"))) object_setter(scheduler, "update_task", MagicMock()) diff --git a/tests/test_stores.py b/tests/test_stores.py index 7427f6d..854dc62 100644 --- a/tests/test_stores.py +++ b/tests/test_stores.py @@ -95,8 +95,11 @@ def create_add_task(timezone, create_task): def create(store, fn=dummy_task, run_at=datetime(2999, 1, 1), id=None, paused=False, **kwargs): run_at = timezone.localize(run_at) task = create_task(fn=fn, trigger="date", trigger_args={"run_at": run_at}, id=id, **kwargs) - task.next_run_time = None if paused else task.trigger.get_next_trigger_time(None, run_at) + task.next_run_time = ( + None if paused else task.trigger.get_next_trigger_time(timezone, None, run_at) + ) if store: + store.start(task.scheduler, "default") store.add_task(task) return task diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 52cf0a6..58f0200 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -80,13 +80,13 @@ def test_get_run_times(create_task, timezone): fn=dummyfn, ) - run_times = task.get_run_times(run_time) + run_times = task.get_run_times(timezone, run_time) assert run_times == [] - run_times = task.get_run_times(expected_times[0]) + run_times = task.get_run_times(timezone, expected_times[0]) assert run_times == [expected_times[0]] - run_times = task.get_run_times(expected_times[1]) + run_times = task.get_run_times(timezone, expected_times[1]) assert run_times == expected_times diff --git a/tests/test_triggers.py b/tests/test_triggers.py index 32d680e..ab31690 100644 --- a/tests/test_triggers.py +++ b/tests/test_triggers.py @@ -1,8 +1,7 @@ import pickle import random -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, tzinfo from typing import Optional -from unittest.mock import Mock import pytest import pytz @@ -25,8 +24,11 @@ def __init__(self, dt: datetime, jitter: int, **kwargs) -> None: self.dt = dt self.jitter = jitter - def get_next_trigger_time(self, previous_time: datetime, now: datetime): - return self.apply_jitter(self.dt, self.jitter, now) + def get_next_trigger_time(self, timezone: tzinfo, previous_time: datetime, now: datetime): + dt = self.dt + if dt and not dt.tzinfo: + dt = self.dt.replace(tzinfo=timezone) + return self.apply_jitter(dt, self.jitter, now) class TestDateTrigger: @@ -71,7 +73,7 @@ def test_get_next_trigger_time( previous = timezone.localize(previous) if previous else None now = timezone.localize(now) expected = timezone.localize(expected) if expected else None - assert trigger.get_next_trigger_time(previous, now) == expected + assert trigger.get_next_trigger_time(timezone, previous, now) == expected @pytest.mark.parametrize("is_dst", [True, False], ids=["daylight saving", "standard time"]) def test_dst_change(self, is_dst): @@ -87,7 +89,7 @@ def test_dst_change(self, is_dst): trigger_date = eastern.normalize(run_at + timedelta(minutes=55)) trigger = DateTrigger(run_at=trigger_date, timezone=eastern) - assert str(trigger.get_next_trigger_time(None, trigger_date)) == str(trigger_date) + assert str(trigger.get_next_trigger_time(eastern, None, trigger_date)) == str(trigger_date) def test_repr(self, timezone): trigger = DateTrigger(datetime(2022, 11, 3), timezone) @@ -111,16 +113,16 @@ def trigger(self, timezone): def test_invalid_interval(self, timezone): pytest.raises(TypeError, IntervalTrigger, "1-6", timezone=timezone) - def test_start_end_times_string(self, timezone, monkeypatch): - monkeypatch.setattr("asyncz.triggers.interval.get_localzone", Mock(return_value=timezone)) + def test_start_end_times_string(self): trigger = IntervalTrigger(start_at="2022-11-05 05:06:53", end_at="2023-11-05 05:11:32") - assert trigger.start_at == timezone.localize(datetime(2022, 11, 5, 5, 6, 53)) - assert trigger.end_at == timezone.localize(datetime(2023, 11, 5, 5, 11, 32)) + assert trigger.start_at == datetime(2022, 11, 5, 5, 6, 53) + assert trigger.end_at == datetime(2023, 11, 5, 5, 11, 32) def test_before(self, trigger, timezone): """Tests that if "start_at" is later than "now", it will return start_at.""" now = trigger.start_at - timedelta(seconds=2) - assert trigger.get_next_trigger_time(None, now) == trigger.start_at + assert now.tzinfo is not None + assert trigger.get_next_trigger_time(timezone, None, now) == trigger.start_at def test_within(self, trigger, timezone): """ @@ -128,12 +130,16 @@ def test_within(self, trigger, timezone): interval. """ now = trigger.start_at + timedelta(microseconds=1000) - assert trigger.get_next_trigger_time(None, now) == trigger.start_at + trigger.interval + assert now.tzinfo is not None + assert ( + trigger.get_next_trigger_time(timezone, None, now) + == trigger.start_at + trigger.interval + ) def test_no_start_at(self, timezone): trigger = IntervalTrigger(seconds=2, timezone=timezone) now = datetime.now(timezone) - assert (trigger.get_next_trigger_time(None, now) - now) <= timedelta(seconds=2) + assert (trigger.get_next_trigger_time(timezone, None, now) - now) <= timedelta(seconds=2) def test_end_at(self, timezone): """Tests that the interval trigger won't return any datetimes past the set end time.""" @@ -145,9 +151,11 @@ def test_end_at(self, timezone): timezone=timezone, ) assert trigger.get_next_trigger_time( - None, start_at + timedelta(minutes=2) + timezone, None, start_at + timedelta(minutes=2) ) == start_at.replace(minute=5) - assert trigger.get_next_trigger_time(None, start_at + timedelta(minutes=6)) is None + assert ( + trigger.get_next_trigger_time(timezone, None, start_at + timedelta(minutes=6)) is None + ) def test_dst_change(self): """ @@ -161,11 +169,15 @@ def test_dst_change(self): datetime_edt = eastern.localize(datetime(2013, 3, 10, 1, 5), is_dst=False) correct_next_date = eastern.localize(datetime(2013, 3, 10, 3), is_dst=True) - assert str(trigger.get_next_trigger_time(None, datetime_edt)) == str(correct_next_date) + assert str(trigger.get_next_trigger_time(eastern, None, datetime_edt)) == str( + correct_next_date + ) datetime_est = eastern.localize(datetime(2013, 11, 3, 1, 5), is_dst=True) correct_next_date = eastern.localize(datetime(2013, 11, 3, 1), is_dst=False) - assert str(trigger.get_next_trigger_time(None, datetime_est)) == str(correct_next_date) + assert str(trigger.get_next_trigger_time(eastern, None, datetime_est)) == str( + correct_next_date + ) def test_space_in_expr(self, timezone): trigger = CronTrigger(day="1-2, 4-7", timezone=timezone) @@ -208,7 +220,7 @@ def test_jitter_produces_different_valid_results(self, timezone): results = set() for _ in range(0, 100): - next_trigger_time = trigger.get_next_trigger_time(None, now) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, now) results.add(next_trigger_time) assert timedelta(seconds=2) <= (next_trigger_time - now) <= timedelta(seconds=8) assert len(results) > 1 @@ -229,7 +241,7 @@ def test_jitter_dst_change(self, trigger_args, start_at, start_at_dst, correct_n correct_next_date = timezone.localize(correct_next_date, is_dst=not start_at_dst) for _ in range(0, 100): - next_trigger_time = trigger.get_next_trigger_time(None, start_at + epsilon) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, start_at + epsilon) assert abs(next_trigger_time - correct_next_date) <= timedelta(seconds=5) def test_jitter_with_end_at(self, timezone): @@ -238,7 +250,7 @@ def test_jitter_with_end_at(self, timezone): trigger = IntervalTrigger(seconds=5, jitter=5, end_at=end_at) for _ in range(0, 100): - next_trigger_time = trigger.get_next_trigger_time(None, now) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, now) assert next_trigger_time is None or next_trigger_time <= end_at @@ -263,13 +275,16 @@ def trigger(self, timezone): ) def test_next_fire_time(self, trigger, timezone, start_time, expected): expected = timezone.localize(expected) if expected else None - assert trigger.get_next_trigger_time(None, timezone.localize(start_time)) == expected + assert ( + trigger.get_next_trigger_time(timezone, None, timezone.localize(start_time)) + == expected + ) def test_jitter(self, trigger, timezone): trigger.jitter = 5 start_time = expected = timezone.localize(datetime(2022, 8, 6)) for _ in range(100): - next_trigger_time = trigger.get_next_trigger_time(None, start_time) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, start_time) assert abs(expected - next_trigger_time) <= timedelta(seconds=5) @pytest.mark.parametrize("jitter", [None, 5], ids=["nojitter", "jitter"]) @@ -318,14 +333,17 @@ def trigger(self, timezone): ) def test_next_trigger_time(self, trigger, timezone, start_time, expected): expected = timezone.localize(expected) if expected else None - assert trigger.get_next_trigger_time(None, timezone.localize(start_time)) == expected + assert ( + trigger.get_next_trigger_time(timezone, None, timezone.localize(start_time)) + == expected + ) def test_jitter(self, trigger, timezone): trigger.jitter = 5 start_time = timezone.localize(datetime(2022, 8, 6)) expected = timezone.localize(datetime(2022, 8, 7)) for _ in range(100): - next_trigger_time = trigger.get_next_trigger_time(None, start_time) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, start_time) assert abs(expected - next_trigger_time) <= timedelta(seconds=5) @pytest.mark.parametrize("jitter", [None, 5], ids=["nojitter", "jitter"]) @@ -354,57 +372,61 @@ def test_pickle(self, trigger, jitter): class TestJitter: - def test_jitter_disabled(self): - dt = datetime(2022, 5, 25, 14, 49, 50) + def test_jitter_disabled(self, timezone): + dt = datetime(2022, 5, 25, 14, 49, 50, tzinfo=timezone) trigger = DummyTriggerWithJitter(dt, None) - now = datetime(2022, 5, 25, 13, 40, 44) - assert trigger.get_next_trigger_time(None, now) == dt + now = datetime(2022, 5, 25, 13, 40, 44, tzinfo=timezone) + assert trigger.get_next_trigger_time(timezone, None, now) == dt - def test_jitter_with_none_next_fire_time(self): + def test_jitter_with_none_next_fire_time(self, timezone): trigger = DummyTriggerWithJitter(None, 5) - now = datetime(2022, 5, 25, 13, 40, 44) - assert trigger.get_next_trigger_time(None, now) is None + now = datetime(2022, 5, 25, 13, 40, 44, tzinfo=timezone) + assert trigger.get_next_trigger_time(timezone, None, now) is None - def test_jitter_positive(self, monkeypatch): + def test_jitter_positive(self, monkeypatch, timezone): monkeypatch.setattr(random, "uniform", lambda a, b: 30.0) - now = datetime(2022, 5, 25, 13, 40, 44) + now = datetime(2022, 5, 25, 13, 40, 44, tzinfo=timezone) dt = datetime(2022, 5, 25, 14, 49, 50) - expected_dt = datetime(2022, 5, 25, 14, 50, 20) + expected_dt = datetime(2022, 5, 25, 14, 50, 20, tzinfo=timezone) trigger = DummyTriggerWithJitter(dt, 60) - assert trigger.get_next_trigger_time(None, now) == expected_dt + assert trigger.get_next_trigger_time(timezone, None, now) == expected_dt - def test_jitter_in_future_but_initial_date_in_past(self, monkeypatch): + def test_jitter_in_future_but_initial_date_in_past(self, monkeypatch, timezone): monkeypatch.setattr(random, "uniform", lambda a, b: 30.0) - now = datetime(2022, 5, 25, 13, 40, 44) + now = datetime(2022, 5, 25, 13, 40, 44, tzinfo=timezone) dt = datetime(2022, 5, 25, 13, 40, 30) - expected_dt = datetime(2022, 5, 25, 13, 41, 0) + expected_dt = datetime(2022, 5, 25, 13, 41, 0, tzinfo=timezone) trigger = DummyTriggerWithJitter(dt, 60) - assert trigger.get_next_trigger_time(None, now) == expected_dt + assert trigger.get_next_trigger_time(timezone, None, now) == expected_dt - def test_jitter_is_now(self, monkeypatch): + def test_jitter_is_now(self, monkeypatch, timezone): monkeypatch.setattr(random, "uniform", lambda a, b: 4.0) - now = datetime(2022, 5, 25, 13, 40, 44) + now = datetime(2022, 5, 25, 13, 40, 44, tzinfo=timezone) dt = datetime(2022, 5, 25, 13, 40, 40) expected_dt = now trigger = DummyTriggerWithJitter(dt, 60) - assert trigger.get_next_trigger_time(None, now) == expected_dt + assert trigger.get_next_trigger_time(timezone, None, now) == expected_dt - def test_jitter(self): - now = datetime(2022, 5, 25, 13, 36, 44) + def test_jitter(self, timezone): + now = datetime(2022, 5, 25, 13, 36, 44, tzinfo=timezone) dt = datetime(2022, 5, 25, 13, 40, 45) - min_expected_dt = datetime(2022, 5, 25, 13, 40, 40) - max_expected_dt = datetime(2022, 5, 25, 13, 40, 50) + min_expected_dt = datetime(2022, 5, 25, 13, 40, 40, tzinfo=timezone) + max_expected_dt = datetime(2022, 5, 25, 13, 40, 50, tzinfo=timezone) trigger = DummyTriggerWithJitter(dt, 5) for _ in range(0, 100): - assert min_expected_dt <= trigger.get_next_trigger_time(None, now) <= max_expected_dt + assert ( + min_expected_dt + <= trigger.get_next_trigger_time(timezone, None, now) + <= max_expected_dt + ) class TestCronTrigger: @@ -416,13 +438,13 @@ def test_cron_trigger_1(self, timezone): assert str(trigger) == "cron[year='2022/2', month='1/3', day='5-13']" start_at = timezone.localize(datetime(2021, 12, 1)) correct_next_date = timezone.localize(datetime(2022, 1, 5)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_trigger_2(self, timezone): trigger = CronTrigger(year="2022/2", month="1/3", day="5-13", timezone=timezone) start_at = timezone.localize(datetime(2022, 10, 14)) correct_next_date = timezone.localize(datetime(2024, 1, 5)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_trigger_3(self, timezone): trigger = CronTrigger(year="2022", month="feb-dec", hour="8-10", timezone=timezone) @@ -432,24 +454,21 @@ def test_cron_trigger_3(self, timezone): ) start_at = timezone.localize(datetime(2022, 1, 1)) correct_next_date = timezone.localize(datetime(2022, 2, 1, 8)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_trigger_4(self, timezone): trigger = CronTrigger(year="2022", month="2", day="last", timezone=timezone) assert repr(trigger) == ( - "" + "" ) start_at = timezone.localize(datetime(2022, 2, 1)) correct_next_date = timezone.localize(datetime(2022, 2, 28)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_start_end_times_string(self, timezone, monkeypatch): - monkeypatch.setattr( - "asyncz.triggers.cron.trigger.get_localzone", Mock(return_value=timezone) - ) trigger = CronTrigger(start_at="2022-11-05 05:06:53", end_at="2023-11-05 05:11:32") - assert trigger.start_at == timezone.localize(datetime(2022, 11, 5, 5, 6, 53)) - assert trigger.end_at == timezone.localize(datetime(2023, 11, 5, 5, 11, 32)) + assert trigger.start_at == datetime(2022, 11, 5, 5, 6, 53) + assert trigger.end_at == datetime(2023, 11, 5, 5, 11, 32) def test_cron_zero_value(self, timezone): trigger = CronTrigger(year=2022, month=2, hour=0, timezone=timezone) @@ -463,7 +482,7 @@ def test_cron_year_list(self, timezone): assert str(trigger) == "cron[year='2023,2022']" start_at = timezone.localize(datetime(2023, 1, 1)) correct_next_date = timezone.localize(datetime(2023, 1, 1)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_start_at(self, timezone): trigger = CronTrigger( @@ -477,28 +496,34 @@ def test_cron_start_at(self, timezone): assert str(trigger) == "cron[year='2022', month='2', hour='8-10']" start_at = timezone.localize(datetime(2022, 1, 1)) correct_next_date = timezone.localize(datetime(2022, 2, 4, 8)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_previous_trigger_time_1(self, timezone): trigger = CronTrigger(day="*", timezone=timezone) previous_fire_time = timezone.localize(datetime(2022, 11, 23)) now = timezone.localize(datetime(2022, 11, 26)) correct_next_date = timezone.localize(datetime(2022, 11, 24)) - assert trigger.get_next_trigger_time(previous_fire_time, now) == correct_next_date + assert ( + trigger.get_next_trigger_time(timezone, previous_fire_time, now) == correct_next_date + ) def test_previous_trigger_time_2(self, timezone): trigger = CronTrigger(day="*", timezone=timezone) previous_fire_time = timezone.localize(datetime(2022, 11, 23)) now = timezone.localize(datetime(2022, 11, 22)) correct_next_date = timezone.localize(datetime(2022, 11, 22)) - assert trigger.get_next_trigger_time(previous_fire_time, now) == correct_next_date + assert ( + trigger.get_next_trigger_time(timezone, previous_fire_time, now) == correct_next_date + ) def test_previous_trigger_time_3(self, timezone): trigger = CronTrigger(day="*", timezone=timezone) previous_fire_time = timezone.localize(datetime(2022, 4, 25)) now = timezone.localize(datetime(2022, 4, 25)) correct_next_date = timezone.localize(datetime(2022, 4, 26)) - assert trigger.get_next_trigger_time(previous_fire_time, now) == correct_next_date + assert ( + trigger.get_next_trigger_time(timezone, previous_fire_time, now) == correct_next_date + ) def test_cron_weekday_overlap(self, timezone): trigger = CronTrigger(year=2009, month=1, day="6-10", day_of_week="2-4", timezone=timezone) @@ -509,7 +534,7 @@ def test_cron_weekday_overlap(self, timezone): assert str(trigger) == "cron[year='2009', month='1', day='6-10', day_of_week='2-4']" start_at = timezone.localize(datetime(2009, 1, 1)) correct_next_date = timezone.localize(datetime(2009, 1, 7)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_weekday_nomatch(self, timezone): trigger = CronTrigger(year=2009, month=1, day="6-10", day_of_week="0,6", timezone=timezone) @@ -520,7 +545,7 @@ def test_cron_weekday_nomatch(self, timezone): assert str(trigger) == "cron[year='2009', month='1', day='6-10', day_of_week='0,6']" start_at = timezone.localize(datetime(2009, 1, 1)) correct_next_date = None - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_weekday_positional(self, timezone): trigger = CronTrigger(year=2009, month=1, day="6-10", day_of_week="0,6", timezone=timezone) @@ -531,7 +556,7 @@ def test_cron_weekday_positional(self, timezone): assert str(trigger) == "cron[year='2009', month='1', day='6-10', day_of_week='0,6']" start_at = timezone.localize(datetime(2009, 1, 1)) correct_next_date = None - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_week_1(self, timezone): trigger = CronTrigger(year=2009, month=2, week=8, timezone=timezone) @@ -541,7 +566,7 @@ def test_week_1(self, timezone): assert str(trigger) == "cron[year='2009', month='2', week='8']" start_at = timezone.localize(datetime(2009, 1, 1)) correct_next_date = timezone.localize(datetime(2009, 2, 16)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_week_2(self, timezone): trigger = CronTrigger(year=2009, week=15, day_of_week=2, timezone=timezone) @@ -551,7 +576,7 @@ def test_week_2(self, timezone): assert str(trigger) == "cron[year='2009', week='15', day_of_week='2']" start_at = timezone.localize(datetime(2009, 1, 1)) correct_next_date = timezone.localize(datetime(2009, 4, 8)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_extra_coverage(self, timezone): trigger = CronTrigger(day="6,8", timezone=timezone) @@ -559,7 +584,7 @@ def test_cron_extra_coverage(self, timezone): assert str(trigger) == "cron[day='6,8']" start_at = timezone.localize(datetime(2022, 12, 31)) correct_next_date = timezone.localize(datetime(2023, 1, 6)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_cron_faulty_expr(self, timezone): pytest.raises(ValueError, CronTrigger, year="2009-fault", timezone=timezone) @@ -574,13 +599,13 @@ def test_cron_increment_weekday(self, timezone): assert str(trigger) == "cron[hour='5-6']" start_at = timezone.localize(datetime(2022, 9, 25, 7)) correct_next_date = timezone.localize(datetime(2022, 9, 26, 5)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date def test_month_rollover(self, timezone): trigger = CronTrigger(timezone=timezone, day=30) now = timezone.localize(datetime(2022, 2, 1)) expected = timezone.localize(datetime(2022, 3, 30)) - assert trigger.get_next_trigger_time(None, now) == expected + assert trigger.get_next_trigger_time(timezone, None, now) == expected def test_timezone_from_start_at(self, timezone): """ @@ -597,10 +622,14 @@ def test_end_at(self, timezone): trigger = CronTrigger(year=2022, hour=4, end_at=end_at) start_at = timezone.localize(datetime(2022, 4, 13, 2, 30)) - assert trigger.get_next_trigger_time(None, start_at - timedelta(1)) == start_at.replace( - day=12, hour=4, minute=0 + assert trigger.get_next_trigger_time( + timezone, None, start_at - timedelta(1) + ) == start_at.replace( + day=12, + hour=4, + minute=0, ) - assert trigger.get_next_trigger_time(None, start_at) is None + assert trigger.get_next_trigger_time(timezone, None, start_at) is None def test_different_tz(self, timezone): alter_tz = pytz.FixedOffset(-600) @@ -611,7 +640,7 @@ def test_different_tz(self, timezone): assert str(trigger) == "cron[year='2009', week='15', day_of_week='2']" start_at = alter_tz.localize(datetime(2008, 12, 31, 22)) correct_next_date = timezone.localize(datetime(2009, 4, 8)) - assert trigger.get_next_trigger_time(None, start_at) == correct_next_date + assert trigger.get_next_trigger_time(timezone, None, start_at) == correct_next_date @pytest.mark.parametrize( "trigger_args, start_at, start_at_dst, correct_next_date", @@ -634,9 +663,11 @@ def test_dst_change(self, trigger_args, start_at, start_at_dst, correct_next_dat trigger = CronTrigger(timezone=timezone, **trigger_args) start_at = timezone.localize(start_at, is_dst=start_at_dst) correct_next_date = timezone.localize(correct_next_date, is_dst=not start_at_dst) - assert str(trigger.get_next_trigger_time(None, start_at)) == str(correct_next_date) + assert str(trigger.get_next_trigger_time(timezone, None, start_at)) == str( + correct_next_date + ) - def test_timezone_change(self, timezone): + def test_timezone_change(self): """ Ensure that get_next_fire_time method returns datetimes in the timezone of the trigger and not in the timezone of the passed in start_at. @@ -647,7 +678,7 @@ def test_timezone_change(self, timezone): trigger = CronTrigger(hour=11, minute="*/5", timezone=est) start_at = cst.localize(datetime(2022, 9, 26, 10, 16)) correct_next_date = est.localize(datetime(2022, 9, 26, 11, 20)) - assert str(trigger.get_next_trigger_time(None, start_at)) == str(correct_next_date) + assert str(trigger.get_next_trigger_time(est, None, start_at)) == str(correct_next_date) def test_pickle(self, timezone): trigger = CronTrigger( @@ -664,13 +695,13 @@ def test_jitter_produces_differrent_valid_results(self, timezone): results = set() for _ in range(0, 100): - next_trigger_time = trigger.get_next_trigger_time(None, now) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, now) results.add(next_trigger_time) assert timedelta(seconds=25) <= (next_trigger_time - now) <= timedelta(seconds=35) assert len(results) > 1 - def test_jitter_with_timezone(self, timezone): + def test_jitter_with_timezone(self): est = pytz.FixedOffset(-300) cst = pytz.FixedOffset(-360) trigger = CronTrigger(hour=11, minute="*/5", timezone=est, jitter=5) @@ -678,7 +709,7 @@ def test_jitter_with_timezone(self, timezone): correct_next_date = est.localize(datetime(2022, 9, 26, 11, 20)) for _ in range(0, 100): assert abs( - trigger.get_next_trigger_time(None, start_at) - correct_next_date + trigger.get_next_trigger_time(est, None, start_at) - correct_next_date ) <= timedelta(seconds=5) @pytest.mark.parametrize( @@ -698,7 +729,7 @@ def test_jitter_dst_change(self, trigger_args, start_at, start_at_dst, correct_n correct_next_date = timezone.localize(correct_next_date, is_dst=not start_at_dst) for _ in range(0, 100): - next_trigger_time = trigger.get_next_trigger_time(None, start_at) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, start_at) assert abs(next_trigger_time - correct_next_date) <= timedelta(seconds=5) def test_jitter_with_end_at(self, timezone): @@ -707,7 +738,7 @@ def test_jitter_with_end_at(self, timezone): trigger = CronTrigger(minute="*", jitter=5, end_at=end_at) for _ in range(0, 100): - next_trigger_time = trigger.get_next_trigger_time(None, now) + next_trigger_time = trigger.get_next_trigger_time(timezone, None, now) assert next_trigger_time is None or next_trigger_time <= end_at @pytest.mark.parametrize( diff --git a/tests/test_utils.py b/tests/test_utils.py index 91492f2..8fa3b1c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -19,7 +19,6 @@ obj_to_ref, ref_to_obj, timedelta_seconds, - to_bool, to_datetime, to_float, to_int, @@ -71,41 +70,6 @@ def test_none(self): assert to_int(None) is None -class TestToBool: - @pytest.mark.parametrize( - "value", - [" True", "true ", "Yes", " yes ", "1 ", True], - ids=[ - "capital true", - "lowercase true", - "capital yes", - "lowercase yes", - "one", - "True", - ], - ) - def test_true(self, value): - assert to_bool(value) is True - - @pytest.mark.parametrize( - "value", - [" False", "false ", "No", " no ", "0 ", False], - ids=[ - "capital", - "lowercase false", - "capital no", - "lowercase no", - "zero", - "False", - ], - ) - def test_false(self, value): - assert to_bool(value) is False - - def test_bad_value(self): - assert to_bool("yep") is False - - class TestToTimezone: def test_str(self): value = to_timezone("Europe/London")