From 2ecc84bddb3d0111a2e9f982cb0776daff1be5ea Mon Sep 17 00:00:00 2001 From: Jose Navas Date: Fri, 19 Dec 2014 14:37:08 -0700 Subject: [PATCH 1/6] Removing toredis and adding our own pubsub --- moi/__init__.py | 6 +++- moi/group.py | 30 +++++++++-------- moi/pubsub.py | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 moi/pubsub.py diff --git a/moi/__init__.py b/moi/__init__.py index eba4fb8..ff835a8 100644 --- a/moi/__init__.py +++ b/moi/__init__.py @@ -7,12 +7,14 @@ # ----------------------------------------------------------------------------- import os +from tornado.ioloop import IOLoop from redis import Redis from future import standard_library with standard_library.hooks(): from configparser import ConfigParser from moi.context import Context +from moi.pubsub import PubSub REDIS_KEY_TIMEOUT = 84600 * 14 # two weeks @@ -30,6 +32,8 @@ password=_config.get('redis', 'password'), db=_config.get('redis', 'db')) +# Instantiate the pubsub object +pubsub = PubSub(r_client, IOLoop.instance()) # setup contexts ctxs = {name: Context(name) @@ -37,4 +41,4 @@ ctx_default = _config.get('ipython', 'default') __version__ = '0.1.0-dev' -__all__ = ['r_client', 'ctxs', 'ctx_default', 'REDIS_KEY_TIMEOUT'] +__all__ = ['r_client', 'ctxs', 'ctx_default', 'REDIS_KEY_TIMEOUT', 'pubsub'] diff --git a/moi/group.py b/moi/group.py index e64ccb2..1093df3 100644 --- a/moi/group.py +++ b/moi/group.py @@ -10,11 +10,10 @@ from uuid import uuid4 -import toredis from redis import ResponseError from tornado.escape import json_decode, json_encode -from moi import r_client, ctx_default +from moi import r_client, ctx_default, pubsub _children_key = lambda x: x + ':children' _pubsub_key = lambda x: x + ':pubsub' @@ -32,14 +31,15 @@ class Group(object): `dict`. Any return is ignored. """ def __init__(self, group, forwarder=None): - self.toredis = toredis.Client() - self.toredis.connect() - + # Keep track of the channels that we are listening along with the + # function that we will need to execute to unsubscribe from the channel + # Format: {pubsub_key: (id, function)} self._listening_to = {} self.group = group self.group_children = _children_key(group) self.group_pubsub = _pubsub_key(group) + self.group_unsubscriber = None if forwarder is None: self.forwarder = lambda x: None @@ -70,9 +70,10 @@ def __del__(self): def close(self): """Unsubscribe the group and all jobs being listened too""" - for channel in self._listening_to: - self.toredis.unsubscribe(channel) - self.toredis.unsubscribe(self.group_pubsub) + for _, unsubscriber in self._listening_to.values(): + unsubscriber() + + self.group_unsubscriber() def _decode(self, data): try: @@ -83,19 +84,21 @@ def _decode(self, data): @property def jobs(self): """Get the known job IDs""" - return self._listening_to.values() + return [val[0] for val in self._listening_to.values()] def listen_for_updates(self): """Attach a callback on the group pubsub""" - self.toredis.subscribe(self.group_pubsub, callback=self.callback) + self.group_unsubscriber = pubsub.subscribe(self.group_pubsub, + self.callback) def listen_to_node(self, id_): """Attach a callback on the job pubsub if it exists""" if r_client.get(id_) is None: return else: - self.toredis.subscribe(_pubsub_key(id_), callback=self.callback) - self._listening_to[_pubsub_key(id_)] = id_ + key = _pubsub_key(id_) + unsubscriber = pubsub.subscribe(key, callback=self.callback) + self._listening_to[key] = (id_, unsubscriber) return id_ def unlisten_to_node(self, id_): @@ -114,8 +117,9 @@ def unlisten_to_node(self, id_): id_pubsub = _pubsub_key(id_) if id_pubsub in self._listening_to: + _, unsubcriber = self._listening_to[id_pubsub] del self._listening_to[id_pubsub] - self.toredis.unsubscribe(id_pubsub) + unsubcriber() parent = json_decode(r_client.get(id_)).get('parent', None) if parent is not None: diff --git a/moi/pubsub.py b/moi/pubsub.py new file mode 100644 index 0000000..b3856ac --- /dev/null +++ b/moi/pubsub.py @@ -0,0 +1,86 @@ +r"""Pub-sub communication""" + +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- + +from tornado.ioloop import PeriodicCallback + +from collections import defaultdict + + +class PubSub(object): + def __init__(self, r_client, tornado_ioloop): + self.channel_listeners = defaultdict(list) + self._pubsub = r_client.pubsub(ignore_subscribe_messages=True) + self._callback = PeriodicCallback(self.get_message, 1) + self._callback.start() + + def __del__(self): + # Tell tornado to stop asking as + self._callback.stop() + for key in self.channel_listeners: + self._pubsub.unsubscribe(key) + self._pubsub.close() + + def subscribe(self, channel, callback): + """Subscribe a callback to a channel + + Parameters + ---------- + channel : str + The channel to register the callback + callback : function + The function to be registered + + Returns + ------- + function + The function to call in order to unsubscribe from the channel + """ + # If the current object was not already listening to the channel, + # subscribe to the channel + if channel not in self.channel_listeners: + self._pubsub.subscribe(channel) + + # Add the callback to the channel's listener list + self.channel_listeners[channel].append(callback) + + # Create the function used to unsubscribe the callback + def destructor(): + self.channel_listeners[channel].remove(callback) + # Do some clean-up: if there is no body listen to a channel + # unsubscribe the object from it + if len(self.channel_listeners[channel]) == 0: + self._pubsub.unsubscribe(channel) + + # Return the unsubscribe function + return destructor + + def get_message(self): + """Callback for the tornado's IOLoop""" + # Get a message (if exists) + message = self._pubsub.get_message() + if message: + # There is a message! Execute all the callback functions + self._notify(message) + + def _notify(self, message): + """Notify the message to all it's callback functions + + Parameters + ---------- + message : dict + The message received + """ + # Get the channel of the message + channel = message['channel'] + # Get the actual data + data = message['data'] + # Call all the callback functions passing the data received + for callback in self.channel_listeners[channel]: + callback(data) From 78293c12c75f3e82df4ac650855845cbbd959dc7 Mon Sep 17 00:00:00 2001 From: Jose Navas Date: Sat, 20 Dec 2014 18:06:38 -0700 Subject: [PATCH 2/6] Fixing tests --- moi/group.py | 8 ++++-- moi/job.py | 2 +- moi/test/test_group.py | 55 ++++++++++++++++++++++++------------------ moi/test/test_job.py | 31 +++++++++++++++--------- 4 files changed, 58 insertions(+), 38 deletions(-) diff --git a/moi/group.py b/moi/group.py index 1093df3..a06a8e7 100644 --- a/moi/group.py +++ b/moi/group.py @@ -286,7 +286,8 @@ def _action_get(self, ids): ids = self.jobs result = [] - ids = set(ids) + ids_seen = set() + ids = list(ids) while ids: id_ = ids.pop() @@ -309,7 +310,10 @@ def _action_get(self, ids): if payload['type'] == 'group': for obj in self._traverse(id_): - ids.add(obj['id']) + id_ = obj['id'] + if id_ not in ids_seen: + ids.add(id_) + ids_seen.add(id_) return result diff --git a/moi/job.py b/moi/job.py index 00133c3..2756430 100644 --- a/moi/job.py +++ b/moi/job.py @@ -72,7 +72,7 @@ def _status_change(id, new_status): str The old status """ - job_info = json.loads(r_client.get(id)) + job_info = json.loads(r_client.get(id).decode('utf-8')) old_status = job_info['status'] job_info['status'] = new_status _deposit_payload(job_info) diff --git a/moi/test/test_group.py b/moi/test/test_group.py index c583630..e05aaac 100644 --- a/moi/test/test_group.py +++ b/moi/test/test_group.py @@ -48,16 +48,22 @@ def test_listen_for_updates(self): pass # nothing to test... def test_listen_to_node(self): - self.assertEqual(sorted(self.obj._listening_to.items()), - [('a:pubsub', 'a'), - ('b:pubsub', 'b'), - ('c:pubsub', 'c')]) + exp = {'a:pubsub': 'a', + 'b:pubsub': 'b', + 'c:pubsub': 'c'} + self.assertEqual(sorted(self.obj._listening_to.keys()), + sorted(exp.keys())) + for key in self.obj._listening_to: + self.assertEqual(self.obj._listening_to[key][0], exp[key]) def test_unlisten_to_node(self): self.assertEqual(self.obj.unlisten_to_node('b'), 'b') - self.assertEqual(sorted(self.obj._listening_to.items()), - [('a:pubsub', 'a'), - ('c:pubsub', 'c')]) + exp = {'a:pubsub': 'a', + 'c:pubsub': 'c'} + self.assertEqual(sorted(self.obj._listening_to.keys()), + sorted(exp.keys())) + for key in self.obj._listening_to: + self.assertEqual(self.obj._listening_to[key][0], exp[key]) self.assertEqual(self.obj.unlisten_to_node('foo'), None) def test_callback(self): @@ -99,15 +105,18 @@ def __call__(self, data): self.obj.forwarder = fwd self.obj.action('add', ['d', 'e']) - self.assertEqual(fwd.result, [ + + exp = [ {'add': {u'id': u'd', u'name': u'other job', u'type': u'job'}}, - {'add': {u'id': u'e', u'name': u'other job e', u'type': u'job'}}]) + {'add': {u'id': u'e', u'name': u'other job e', u'type': u'job'}}] + + self.assertCountEqual(fwd.result, exp) self.obj.action('remove', ['e', 'd']) self.assertEqual(fwd.result, [ {'remove': - {u'id': u'e', u'name': u'other job e', u'type': u'job'}}, + {u'id': u'd', u'name': u'other job', u'type': u'job'}}, {'remove': - {u'id': u'd', u'name': u'other job', u'type': u'job'}}]) + {u'id': u'e', u'name': u'other job e', u'type': u'job'}}]) self.obj.action('remove', ['d']) self.assertEqual(fwd.result, []) @@ -129,11 +138,11 @@ def __call__(self, data): self.obj.forwarder = fwd self.obj.job_action('update', ['a', 'b']) - self.assertEqual(fwd.result, [{'update': {u'id': u'a', - u'name': u'a', - u'type': u'job'}}, - {'update': {u'id': u'b', + self.assertEqual(fwd.result, [{'update': {u'id': u'b', u'name': u'b', + u'type': u'job'}}, + {'update': {u'id': u'a', + u'name': u'a', u'type': u'job'}}]) with self.assertRaises(TypeError): @@ -145,8 +154,8 @@ def __call__(self, data): def test_action_add(self): resp = self.obj._action_add(['d', 'f', 'e']) self.assertEqual(resp, [ - {u'id': u'd', u'name': u'other job', u'type': u'job'}, - {u'id': u'e', u'name': u'other job e', u'type': u'job'}]) + {u'id': u'e', u'name': u'other job e', u'type': u'job'}, + {u'id': u'd', u'name': u'other job', u'type': u'job'}]) self.assertIn('d:pubsub', self.obj._listening_to) self.assertIn('e:pubsub', self.obj._listening_to) self.assertNotIn('f:pubsub', self.obj._listening_to) @@ -155,23 +164,23 @@ def test_action_remove(self): self.obj._action_add(['d', 'f', 'e']) resp = self.obj._action_remove(['a', 'd', 'f', 'c', 'e']) self.assertEqual(resp, [ - {u'id': u'a', u'name': u'a', u'type': u'job'}, - {u'id': u'd', u'name': u'other job', u'type': u'job'}, + {u'id': u'e', u'name': u'other job e', u'type': u'job'}, {u'id': u'c', u'name': u'c', u'type': u'job'}, - {u'id': u'e', u'name': u'other job e', u'type': u'job'}]) + {u'id': u'd', u'name': u'other job', u'type': u'job'}, + {u'id': u'a', u'name': u'a', u'type': u'job'}]) self.assertNotIn('a:pubsub', self.obj._listening_to) self.assertNotIn('c:pubsub', self.obj._listening_to) self.assertNotIn('d:pubsub', self.obj._listening_to) self.assertNotIn('e:pubsub', self.obj._listening_to) self.assertNotIn('f:pubsub', self.obj._listening_to) - self.assertEqual(r_client.smembers('testing:children'), {'b'}) + self.assertEqual(r_client.smembers('testing:children'), {b'b'}) def test_action_get(self): resp = self.obj._action_get(['d', 'f', 'e', None]) self.assertEqual(resp, [ - {u'id': u'd', u'name': u'other job', u'type': u'job'}, - {u'id': u'e', u'name': u'other job e', u'type': u'job'}]) + {u'id': u'e', u'name': u'other job e', u'type': u'job'}, + {u'id': u'd', u'name': u'other job', u'type': u'job'}]) if __name__ == '__main__': diff --git a/moi/test/test_job.py b/moi/test/test_job.py index 1739add..331ab83 100644 --- a/moi/test/test_job.py +++ b/moi/test/test_job.py @@ -24,7 +24,9 @@ def setUp(self): self.test_job_info = {'status': 'old status', 'id': self.test_id, - 'pubsub': self.test_pubsub} + 'pubsub': self.test_pubsub, + 'context': list(ctxs)[0], + 'parent': None} def tearDown(self): for k in self.test_keys: @@ -40,7 +42,7 @@ def test_status_change(self): def test_deposit_payload(self): _deposit_payload(self.test_job_info) - obs = json.loads(r_client.get(self.test_id)) + obs = json.loads(r_client.get(self.test_id).decode('utf-8')) self.assertEqual(obs, self.test_job_info) def test_redis_wrap(self): @@ -51,17 +53,20 @@ def foo(a, b, **kwargs): _redis_wrap(self.test_job_info, foo, 1, 2) sleep(2) - obs = json.loads(r_client.get(self.test_job_info['id'])) + obs = json.loads( + r_client.get(self.test_job_info['id']).decode('utf-8')) self.assertEqual(obs['result'], 3) self.assertEqual(obs['status'], 'Success') self.assertNotEqual(obs['date_start'], None) self.assertNotEqual(obs['date_end'], None) r_client.set(self.test_job_info['id'], json.dumps(self.test_job_info)) - _redis_wrap(self.test_job_info, foo, 1, 2, 3) + with self.assertRaises(TypeError): + _redis_wrap(self.test_job_info, foo, 1, 2, 3) sleep(2) - obs = json.loads(r_client.get(self.test_job_info['id'])) + obs = json.loads( + r_client.get(self.test_job_info['id']).decode('utf-8')) self.assertEqual(obs['result'][0], u'Traceback (most recent call last):\n') self.assertEqual(obs['status'], 'Failed') @@ -73,27 +78,29 @@ def foo(a, b, c=10, **kwargs): return a+b+c for ctx in ctxs: - id_, pid_ = submit(ctx, 'no parent', 'test', '/', foo, 1, 2, c=15) + id_, pid_, async_result = submit(ctx, 'no parent', 'test', '/', + foo, 1, 2, c=15) self.test_keys.append(id_) self.test_keys.append(pid_) sleep(2) - obs = json.loads(r_client.get(id_)) + obs = json.loads(r_client.get(id_).decode('utf-8')) self.assertEqual(obs['result'], 18) self.assertEqual(obs['status'], 'Success') self.assertNotEqual(obs['date_start'], None) self.assertNotEqual(obs['date_end'], None) def test__submit(self): - ctx = ctxs.values()[0] + ctx = list(ctxs.values())[0] cmd = 'echo "hello"' - id_, pid_ = _submit(ctx, 'no parent', 'test', '/', system_call, cmd) + id_, pid_, async_result = _submit(ctx, 'no parent', 'test', '/', + system_call, cmd) self.test_keys.append(id_) self.test_keys.append(pid_) sleep(2) - obs = json.loads(r_client.get(id_)) + obs = json.loads(r_client.get(id_).decode('utf-8')) self.assertEqual(obs['result'], [u"hello\n", u"", 0]) self.assertEqual(obs['status'], 'Success') self.assertNotEqual(obs['date_start'], None) @@ -103,13 +110,13 @@ def test_submit_nouser(self): def foo(a, b, c=10, **kwargs): return a+b+c - id_, pid_ = submit_nouser(foo, 1, 2, c=20) + id_, pid_, async_result = submit_nouser(foo, 1, 2, c=20) self.test_keys.append(id_) self.test_keys.append(pid_) sleep(2) - obs = json.loads(r_client.get(id_)) + obs = json.loads(r_client.get(id_).decode('utf-8')) self.assertEqual(obs['result'], 23) self.assertEqual(obs['status'], 'Success') self.assertNotEqual(obs['date_start'], None) From 0700b6daf88ac935561acde7ab017d680e51325d Mon Sep 17 00:00:00 2001 From: Jose Navas Date: Sat, 20 Dec 2014 18:55:45 -0700 Subject: [PATCH 3/6] The test module should be importable in order to be able to run the tests using nosetests and ipython. Ipython imports the caller file internally, so if it is not importable, it sillently fails and is terrible to debug --- moi/test/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 moi/test/__init__.py diff --git a/moi/test/__init__.py b/moi/test/__init__.py new file mode 100644 index 0000000..e69de29 From 10d023e6cc99b9c135b5f2127f14aed1ef657b6f Mon Sep 17 00:00:00 2001 From: Jose Navas Date: Sat, 20 Dec 2014 18:58:09 -0700 Subject: [PATCH 4/6] Removing toredis from setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index da5e21d..8bb9597 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ test_suite='nose.collector', packages=['moi'], extras_require={'test': ["nose >= 0.10.1", "pep8", 'mock']}, - install_requires=['future==0.13.0', 'tornado==3.1.1', 'toredis', 'redis', + install_requires=['future==0.13.0', 'tornado==3.1.1', 'redis', 'ipython[all]'], classifiers=classifiers ) From 5e18f081afccbaa0f4957017d9c3010f6a9dd796 Mon Sep 17 00:00:00 2001 From: Jose Navas Date: Sat, 20 Dec 2014 19:04:31 -0700 Subject: [PATCH 5/6] Adding travis --- .travis.yml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f3d096c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,26 @@ +language: python +env: + - PYTHON_VERSION=2.7 + - PYTHON_VERSION=3.4 +before_install: + - wget http://repo.continuum.io/miniconda/Miniconda3-3.7.3-Linux-x86_64.sh -O miniconda.sh + - chmod +x miniconda.sh + - ./miniconda.sh -b + - export PATH=/home/travis/miniconda3/bin:$PATH + # Update conda itself + - conda update --yes conda +install: + - conda create --yes -n env_name python=$PYTHON_VERSION pip nose flake8 + - source activate env_name + - pip install coveralls + - pip install . +script: + - export MOI_CONFIG_FP=`pwd`/moi_config.txt + - ipython profile create moi_profile --parallel + - ipcluster -n 2 --profile moi_profile & + - nosetest --with-coverage + - flake8 moi setup.py +services: + - redis-server +after_success: + - coveralls \ No newline at end of file From 8f65473ec286c6fb3e0badb1b2fa5b2acf2ee06f Mon Sep 17 00:00:00 2001 From: Jose Navas Date: Sat, 20 Dec 2014 19:05:45 -0700 Subject: [PATCH 6/6] modifying moi_config --- moi_config.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/moi_config.txt b/moi_config.txt index ed595a7..410d2c1 100644 --- a/moi_config.txt +++ b/moi_config.txt @@ -5,5 +5,5 @@ password = db = 0 [ipython] -context=qiita_general,qiita_reserved -default=qiita_general +context=moi_profile,moi_profile +default=moi_profile