Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock clients. Only one calls the Watchdog. #21

Merged
merged 19 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions celery_serverless/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
logger.setLevel(os.environ.get('CELERY_SERVERLESS_LOGLEVEL'))
print('Celery serverless loglevel:', logger.getEffectiveLevel())

from redlock import RedLock
from redis import StrictRedis
from timeoutcontext import timeout as timeout_context
from celery_serverless.watchdog import Watchdog, KombuQueueLengther, build_intercom, invoke_watchdog
from celery_serverless.invoker import invoke_watchdog
from celery_serverless.watchdog import Watchdog, KombuQueueLengther, build_intercom, get_watchdog_lock
from celery_serverless.worker_management import spawn_worker, attach_hooks
hooks = []

Expand All @@ -32,7 +33,7 @@ def worker(event, context, intercom_url=None):
try:
remaining_seconds = context.get_remaining_time_in_millis() / 1000.0
except Exception as e:
logger.exception('Could not got remaining_seconds. Is the context right?')
logger.exception('Could not get remaining_seconds. Is the context right?')
remaining_seconds = 5 * 60 # 5 minutes by default

softlimit = remaining_seconds-30.0 # Poke the job 30sec before the abyss
Expand Down Expand Up @@ -62,22 +63,13 @@ def worker(event, context, intercom_url=None):

@handler_wrapper
def watchdog(event, context):
lock_name = os.environ.get('CELERY_SERVERLESS_LOCK_NAME', 'celery_serverless:watchdog')
lock_url = os.environ.get('CELERY_SERVERLESS_LOCK_URL')
assert lock_url, 'The CELERY_SERVERLESS_LOCK_URL envvar should be set. Even to "disabled" to disable it.'

queue_url = os.environ.get('CELERY_SERVERLESS_QUEUE_URL')
assert queue_url, 'The CELERY_SERVERLESS_QUEUE_URL envvar should be set. Even to "disabled" to disable it.'
Copy link
Collaborator

@sbneto sbneto Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Letting it break with queue_url = os.environ['CELERY_SERVERLESS_QUEUE_URL'] and raising KeyError might make the code a bit simpler.

Copy link
Owner Author

@alanjds alanjds Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then it gives less clue about how to solve the problem. "Is it a code error or my mistake?".

To change the message of a KeyError it will get as long as it is now. Is the change worth?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be ok without a specific message. Seeing os.environ['CELERY_SERVERLESS_QUEUE_URL'] on the error is enough for me to know that I am missing that envvar. But whether to change or not is up to you.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one line instead of two, I do prefer to put my own error message on this case.


intercom_url = os.environ.get('CELERY_SERVERLESS_INTERCOM_URL')
assert intercom_url, 'The CELERY_SERVERLESS_INTERCOM_URL envvar should be set. Even to "disabled" to disable it.'

if lock_url == 'disabled':
lock = None
elif lock_url.startswith(('redis://', 'rediss://')):
lock = RedLock(lock_name, connection_details=[{'url': node} for node in lock_url.split(',')])
else:
raise RuntimeError("This URL is not supported. Only 'redis[s]://...' is supported for now")
lock, lock_name = get_watchdog_lock(enforce=True)

if queue_url == 'disabled':
watched = None
Expand Down Expand Up @@ -106,11 +98,9 @@ def watchdog(event, context):
lock.release()
except (RuntimeError, AttributeError):
pass
else:
time.sleep(1) # Let distributed locks to propagate

logger.info('All set. Reinvoking the Watchdog')
_, future = invoke_watchdog()
_, future = invoke_watchdog(check_lock=False)
future.result()
logger.info('Done reinvoking another Watchdog')

Expand Down
33 changes: 31 additions & 2 deletions celery_serverless/invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import codecs
import json
import multiprocessing
from pprint import pformat

import dirtyjson
Expand All @@ -29,8 +30,10 @@
except ImportError: # Boto3 is an optional extra on setup.py
lambda_client = None


from .cli_utils import run
from .utils import run_aio_on_thread
from .utils import run_aio_on_thread, get_client_lock


CELERY_HANDLER_PATHS = {
'worker': 'celery_serverless.handler_worker',
Expand Down Expand Up @@ -221,5 +224,31 @@ def invoke_worker(config=None, data=None, *args, **kwargs):
return invoke(target='worker', extra_data=data or {}, *args, **kwargs)


def invoke_watchdog(config=None, data=None, *args, **kwargs):
def invoke_watchdog(config=None, data=None, check_lock=True, *args, **kwargs):
from .watchdog import get_watchdog_lock

if check_lock:
lock_watchdog = get_watchdog_lock(enforce=False)[0]
if lock_watchdog.locked():
logger.info('Watchdog lock already held. Giving up.')
return False, RuntimeError('Watchdog lock already held')
return invoke(target='watchdog', extra_data=data or {}, *args, **kwargs)


def client_invoke_watchdog(check_lock=True, blocking_lock=False, *args, **kwargs):
if not check_lock:
logger.debug('Not checking client lock')
return invoke_watchdog(check_lock=True, *args, **kwargs)

client_lock = get_client_lock()[0]
locked = client_lock.acquire(blocking_lock)
if not locked:
logger.info('Could not get Client lock. Giving up.')
return False, RuntimeError('Client lock already held')

logger.debug('Got the client lock')
try:
return invoke_watchdog(with_lock=True, *args, **kwargs)
finally:
logger.debug('Releasing the client lock')
client_lock.release()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a invoke_watchdog() was just made, we could let the client_lock expire by itself as long as it expires before the lifespan of a watchdog. This could avoid an eventual call by another client. Only works for distributed locks that expire though.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. One though is to put a timeout instead of unlock the client_lock if the watchdog got invoked with success.

But right now the client lock can be a multiprocessing.Lock if Redis is not available on the client, or is undesirable to flood Redis with lock checks.

What do you think? Do this now or let go for some other PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok as it is for now, lets just keep it in mind for future work.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue created for future work: #22

2 changes: 1 addition & 1 deletion celery_serverless/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def trigger_invoke(task=None, *args, **kwargs):
logging.warning("Serverless worker will probable not get the task,"
" as its queue %s is probable not being listened there",
kwargs['queue'])
return invoker.invoke()
return invoker.client_invoke_watchdog()


class TriggerServerlessBeforeMixin(object):
Expand Down
71 changes: 71 additions & 0 deletions celery_serverless/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
# coding: utf-8
import os
import asyncio
from threading import Thread
from inspect import isawaitable
import importlib

try:
from redis import StrictRedis

# MONKEYPATCH until https://github.com/andymccurdy/redis-py/pull/1007/ is merged
import redis.lock
def __locked(self):
if self.redis.get(self.name) is not None:
return True
return False
if not hasattr(redis.lock.Lock, 'locked'):
redis.lock.Lock.locked = __locked
except ImportError:
pass

slave_loop = None # loop
slave_thread = None # type: Thread
Expand All @@ -28,3 +44,58 @@ def run_aio_on_thread(coro):
if not slave_thread:
_start_thread()
return asyncio.run_coroutine_threadsafe(coro, slave_loop)


def _get_lock(lock_url='', lock_url_env='CELERY_SERVERLESS_LOCK_URL', lock_url_default='dummy_threading://',
lock_name='', lock_name_env='CELERY_SERVERLESS_LOCK_NAME', lock_name_default='',
enforce=True) -> '(Lock, str)':
"""
Build or fetch a lock from `lock_url` or `lock_url_env` envvar,
falling back to `lock_url_default` contents. Can be a python module having
a Lock inside. E.g: 'multiprocessing://'

If the lock needs it, use `lock_url` or `lock_name_env` envvar
for the lock name, falling back to `lock_name_default` contents.

Passing `enforce=True` will raise a RuntimeError if lock url is empty.
"""
lock_name = os.environ.get(lock_name_env, lock_name_default)
lock_url = os.environ.get(lock_url_env, '')
if enforce:
if lock_url == 'disabled':
lock_url = ''
else:
assert lock_url, ('The %s envvar should be set. Even to "disabled" to disable it.' % lock_url_env)

if lock_url.startswith(('redis://', 'rediss://')):
redis = StrictRedis.from_url(lock_url)
lock = redis.lock(lock_name)
elif lock_url_default and not lock_url:
defaultlock_module_name = lock_url_default.partition('://')[0]
defaultlock_module = importlib.import_module(defaultlock_module_name)
lock = defaultlock_module.Lock()
else:
raise RuntimeError("This URL is not supported. Only 'redis[s]://...' is supported for now")
return lock, lock_name


def get_watchdog_lock(enforce=True) -> '(Lock, str)':
return _get_lock(lock_url_env='CELERY_SERVERLESS_LOCK_URL', lock_url_default='dummy_threading://',
lock_name_env='CELERY_SERVERLESS_LOCK_NAME', lock_name_default='celery_serverless:watchdog',
enforce=enforce)


_CLIENT_LOCK = {}

def get_client_lock(enforce=False) -> '(Lock, str)':
if _CLIENT_LOCK:
return _CLIENT_LOCK['lock'], _CLIENT_LOCK['lock_name']

lock, lock_name = _get_lock(
lock_url_env='CELERY_SERVERLESS_LOCK_URL', lock_url_default='multiprocessing://',
lock_name_env='CELERY_SERVERLESS_LOCK_NAME', lock_name_default='celery_serverless:watchdog-client',
enforce=enforce,
)

_CLIENT_LOCK.update({'lock': lock, 'lock_name': lock_name})
return _CLIENT_LOCK['lock'], _CLIENT_LOCK['lock_name']
7 changes: 5 additions & 2 deletions celery_serverless/watchdog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding: utf-8
import os
import time
import uuid
import logging
Expand All @@ -13,7 +14,9 @@
from redis import StrictRedis
from kombu import Connection
from kombu.transport import pyamqp
from celery_serverless.invoker import invoke_worker, invoke_watchdog
from celery_serverless.invoker import invoke_worker

from .utils import get_watchdog_lock

logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')
Expand Down Expand Up @@ -104,7 +107,7 @@ def _done_callback(fut):
return success_calls

def monitor(self):
locked = self._lock.acquire()
locked = self._lock.acquire(False)
if not locked:
logger.info('Could not get the lock. Giving up.')
return 0
Expand Down
1 change: 1 addition & 0 deletions tests/test_celery_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def test_watchdog_needs_envvar(envname):
else:
raise RuntimeError('Had not raised an AssertionError')


@pytest.mark.timeout(30)
def test_watchdog_monitor_redis_queues(monkeypatch):
queue_url = 'redis://'
Expand Down
82 changes: 82 additions & 0 deletions tests/test_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Tests for `celery_worker_serverless` package."""

import time
import uuid
import logging
import pytest
from pytest_shutil import env
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

from celery_serverless import watchdog, invoker

logger = logging.getLogger(__name__)


@pytest.mark.timeout(30)
def test_watchdog_monitor_redis_queues(monkeypatch):
lock_url = 'redis://'

redis = pytest.importorskip('redis')
from redis.exceptions import ConnectionError

conn = redis.StrictRedis.from_url(lock_url)
try:
conn.ping()
except ConnectionError as err:
pytest.skip('Redis server is not available: %s' % err)

# Redis is available.
# Lets set it up before test the watchdog

times_invoked = 0

def _simulate_watchdog_invocation(*args, **kwargs):
"""
Simulates a Watchdog invocation cycle via Redis locks changes
"""
logger.warning('Simulating an Watchdog invocation: START')

nonlocal times_invoked
times_invoked += 1

# 1) Watchdog fetches its lock
lock, lock_name = watchdog.get_watchdog_lock(enforce=True)

# 2) It runs with the lock or cancels
with_lock = lock.acquire(False)
if not with_lock:
logger.info('Watchdog COULD NOT got the lock')
else:
logger.info('Watchdog GOT the lock')
time.sleep(5)
lock.release()
logger.info('Watchdog RELEASED the lock')

logger.warning('Simulating an Watchdog invocation: END')

conn.flushdb()
with env.set_env(CELERY_SERVERLESS_LOCK_URL=lock_url):
_simulate_watchdog_invocation() # Just be sure that it works.

with ThreadPoolExecutor() as executor:
monkeypatch.setattr(
'celery_serverless.invoker.invoke',
lambda *ar, **kw: (True, executor.submit(_simulate_watchdog_invocation)),
)

client_futures = []
times_invoked = 0

with env.set_env(CELERY_SERVERLESS_LOCK_URL=lock_url):
for i in range(20):
client_futures.append(executor.submit(invoker.client_invoke_watchdog))

client_invokes = [fut.result() for fut in as_completed(client_futures)]
result_flags, results = zip(*client_invokes)

assert times_invoked == 1, 'More than one client succeeded to invoke_watchdog'
assert len([i for i in result_flags if i == True]) == 1, 'More than one client got a lock'