-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock clients. Only one calls the Watchdog. #21
Changes from 11 commits
1084f7b
c680c15
73739b6
3f79dbd
1942f10
0cfa250
4d8ad80
bacf113
61b08c9
f7d29d5
c8b91fc
cd1aed7
0a12515
a25c967
0b317bb
346b147
5ff577e
bdae12c
b0f3ae0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import logging | ||
import codecs | ||
import json | ||
import multiprocessing | ||
from pprint import pformat | ||
|
||
import dirtyjson | ||
|
@@ -29,9 +30,27 @@ | |
except ImportError: # Boto3 is an optional extra on setup.py | ||
lambda_client = None | ||
|
||
try: | ||
from redis import StrictRedis | ||
|
||
# MONKEYPATCH until https://github.com/andymccurdy/redis-py/pull/1007/ is merged | ||
import redis.lock | ||
def __locked(self): | ||
if self.redis.get(self.name) is not None: | ||
return True | ||
return False | ||
if not hasattr(redis.lock.Lock, 'locked'): | ||
redis.lock.Lock.locked = __locked | ||
except ImportError: | ||
pass | ||
|
||
|
||
from .cli_utils import run | ||
from .utils import run_aio_on_thread | ||
|
||
|
||
_CLIENT_LOCK = {} | ||
|
||
CELERY_HANDLER_PATHS = { | ||
'worker': 'celery_serverless.handler_worker', | ||
'watchdog': 'celery_serverless.handler_watchdog', | ||
|
@@ -221,5 +240,55 @@ def invoke_worker(config=None, data=None, *args, **kwargs): | |
return invoke(target='worker', extra_data=data or {}, *args, **kwargs) | ||
|
||
|
||
def invoke_watchdog(config=None, data=None, *args, **kwargs): | ||
def invoke_watchdog(config=None, data=None, check_lock=True, *args, **kwargs): | ||
from .watchdog import _get_watchdog_lock | ||
|
||
if check_lock: | ||
lock_watchdog = _get_watchdog_lock(enforce=False)[0] | ||
if lock_watchdog.locked(): | ||
logger.info('Watchdog lock already held. Giving up.') | ||
return False, RuntimeError('Watchdog lock already held') | ||
return invoke(target='watchdog', extra_data=data or {}, *args, **kwargs) | ||
|
||
|
||
def client_invoke_watchdog(check_lock=True, blocking_lock=False, *args, **kwargs): | ||
if not check_lock: | ||
logger.debug('Not checking client lock') | ||
return invoke_watchdog(check_lock=True, *args, **kwargs) | ||
|
||
client_lock = _get_client_lock()[0] | ||
locked = client_lock.acquire(blocking_lock) | ||
if not locked: | ||
logger.info('Could not get Client lock. Giving up.') | ||
return False, RuntimeError('Client lock already held') | ||
|
||
logger.debug('Got the client lock') | ||
try: | ||
return invoke_watchdog(with_lock=True, *args, **kwargs) | ||
finally: | ||
logger.debug('Releasing the client lock') | ||
client_lock.release() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. One though is to put a timeout instead of unlock the client_lock if the watchdog got invoked with success. But right now the client lock can be a What do you think? Do this now or let go for some other PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is ok as it is for now, lets just keep it in mind for future work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue created for future work: #22 |
||
|
||
|
||
def _get_client_lock(lock_url='', lock_name='', default=multiprocessing.Lock, enforce=False): | ||
if _CLIENT_LOCK and lock_name == _CLIENT_LOCK['lock_name']: | ||
return _CLIENT_LOCK['lock'], _CLIENT_LOCK['lock_name'] | ||
|
||
lock_name = lock_name or os.environ.get('CELERY_SERVERLESS_CLIENT_LOCK_NAME', 'celery_serverless:watchdog-client') | ||
lock_url = lock_url or os.environ.get('CELERY_SERVERLESS_LOCK_URL', '(unavailable)') | ||
if enforce: | ||
if lock_url == 'disabled': | ||
lock_url = '' | ||
else: | ||
assert lock_url, 'The CELERY_SERVERLESS_LOCK_URL envvar should be set. Even to "disabled" to disable it.' | ||
|
||
if default and not lock_url: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This born as a copy from Generalizing both will make the However, I had not understood your "be a string and check for multiprocessing". You mean receiving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I think it is strange that sometimes it is a class, sometimes a url. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sbneto Take a look on how |
||
lock = default() | ||
elif lock_url.startswith(('redis://', 'rediss://')): | ||
redis = StrictRedis.from_url(lock_url) | ||
lock = redis.lock(lock_name + '-client') | ||
else: | ||
raise RuntimeError("This URL is not supported. Only 'redis[s]://...' is supported for now") | ||
|
||
_CLIENT_LOCK.update({'lock': lock, 'lock_name': lock_name}) | ||
return _CLIENT_LOCK['lock'], _CLIENT_LOCK['lock_name'] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
# coding: utf-8 | ||
import os | ||
import time | ||
import uuid | ||
import logging | ||
|
@@ -13,7 +14,7 @@ | |
from redis import StrictRedis | ||
from kombu import Connection | ||
from kombu.transport import pyamqp | ||
from celery_serverless.invoker import invoke_worker, invoke_watchdog | ||
from celery_serverless.invoker import invoke_worker | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel('DEBUG') | ||
|
@@ -23,6 +24,25 @@ | |
DEFAULT_STARTED_TIMEOUT = 30 # half minute | ||
|
||
|
||
def _get_watchdog_lock(lock_url='', lock_name='', default=dummy_threading.Lock, enforce=True) -> '(Lock, str)': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Both functions should be merged. This is the 1st test-passing code version anyway ;) Lets get it better before merge the PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Take a look on |
||
lock_name = lock_name or os.environ.get('CELERY_SERVERLESS_LOCK_NAME', 'celery_serverless:watchdog') | ||
lock_url = lock_url or os.environ.get('CELERY_SERVERLESS_LOCK_URL', '') | ||
if enforce: | ||
if lock_url == 'disabled': | ||
lock_url = '' | ||
else: | ||
assert lock_url, 'The CELERY_SERVERLESS_LOCK_URL envvar should be set. Even to "disabled" to disable it.' | ||
|
||
if lock_url.startswith(('redis://', 'rediss://')): | ||
redis = StrictRedis.from_url(lock_url) | ||
lock = redis.lock(lock_name) | ||
elif default and not lock_url: | ||
lock = default() | ||
else: | ||
raise RuntimeError("This URL is not supported. Only 'redis[s]://...' is supported for now") | ||
return lock, lock_name | ||
|
||
|
||
class Watchdog(object): | ||
def __init__(self, communicator=None, name='', lock=None, watched=None): | ||
self._intercom = communicator | ||
|
@@ -104,7 +124,7 @@ def _done_callback(fut): | |
return success_calls | ||
|
||
def monitor(self): | ||
locked = self._lock.acquire() | ||
locked = self._lock.acquire(False) | ||
if not locked: | ||
logger.info('Could not get the lock. Giving up.') | ||
return 0 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
|
||
"""Tests for `celery_worker_serverless` package.""" | ||
|
||
import time | ||
import uuid | ||
import logging | ||
import pytest | ||
from pytest_shutil import env | ||
from concurrent.futures import ThreadPoolExecutor, as_completed | ||
from datetime import datetime | ||
|
||
from celery_serverless import watchdog, invoker | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@pytest.mark.timeout(30) | ||
def test_watchdog_monitor_redis_queues(monkeypatch): | ||
lock_url = 'redis://' | ||
|
||
redis = pytest.importorskip('redis') | ||
from redis.exceptions import ConnectionError | ||
|
||
conn = redis.StrictRedis.from_url(lock_url) | ||
try: | ||
conn.ping() | ||
except ConnectionError as err: | ||
pytest.skip('Redis server is not available: %s' % err) | ||
|
||
# Redis is available. | ||
# Lets set it up before test the watchdog | ||
|
||
times_invoked = 0 | ||
|
||
def _simulate_watchdog_invocation(*args, **kwargs): | ||
""" | ||
Simulates a Watchdog invocation cycle via Redis locks changes | ||
""" | ||
logger.warning('Simulating an Watchdog invocation: START') | ||
|
||
nonlocal times_invoked | ||
times_invoked += 1 | ||
|
||
# 1) Watchdog fetches its lock | ||
lock, lock_name = watchdog._get_watchdog_lock(enforce=True) | ||
|
||
# 2) It runs with the lock or cancels | ||
with_lock = lock.acquire(False) | ||
if not with_lock: | ||
logger.info('Watchdog COULD NOT got the lock') | ||
else: | ||
logger.info('Watchdog GOT the lock') | ||
time.sleep(5) | ||
lock.release() | ||
logger.info('Watchdog RELEASED the lock') | ||
|
||
logger.warning('Simulating an Watchdog invocation: END') | ||
|
||
conn.flushdb() | ||
with env.set_env(CELERY_SERVERLESS_LOCK_URL=lock_url): | ||
_simulate_watchdog_invocation() # Just be sure that it works. | ||
|
||
with ThreadPoolExecutor() as executor: | ||
monkeypatch.setattr( | ||
'celery_serverless.invoker.invoke', | ||
lambda *ar, **kw: (True, executor.submit(_simulate_watchdog_invocation)), | ||
) | ||
|
||
client_futures = [] | ||
times_invoked = 0 | ||
|
||
with env.set_env(CELERY_SERVERLESS_LOCK_URL=lock_url): | ||
for i in range(20): | ||
client_futures.append(executor.submit(invoker.client_invoke_watchdog)) | ||
|
||
client_invokes = [fut.result() for fut in as_completed(client_futures)] | ||
result_flags, results = zip(*client_invokes) | ||
|
||
assert times_invoked == 1, 'More than one client succeeded to invoke_watchdog' | ||
assert len([i for i in result_flags if i == True]) == 1, 'More than one client got a lock' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Letting it break with
queue_url = os.environ['CELERY_SERVERLESS_QUEUE_URL']
and raisingKeyError
might make the code a bit simpler.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then it gives less clue about how to solve the problem. "Is it a code error or my mistake?".
To change the message of a KeyError it will get as long as it is now. Is the change worth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be ok without a specific message. Seeing
os.environ['CELERY_SERVERLESS_QUEUE_URL']
on the error is enough for me to know that I am missing that envvar. But whether to change or not is up to you.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For one line instead of two, I do prefer to put my own error message on this case.