Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove toredis #15

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
language: python
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome... thank you for getting travis up

env:
- PYTHON_VERSION=2.7
- PYTHON_VERSION=3.4
before_install:
- wget http://repo.continuum.io/miniconda/Miniconda3-3.7.3-Linux-x86_64.sh -O miniconda.sh
- chmod +x miniconda.sh
- ./miniconda.sh -b
- export PATH=/home/travis/miniconda3/bin:$PATH
# Update conda itself
- conda update --yes conda
install:
- conda create --yes -n env_name python=$PYTHON_VERSION pip nose flake8
- source activate env_name
- pip install coveralls
- pip install .
script:
- export MOI_CONFIG_FP=`pwd`/moi_config.txt
- ipython profile create moi_profile --parallel
- ipcluster -n 2 --profile moi_profile &
- nosetest --with-coverage
- flake8 moi setup.py
services:
- redis-server
after_success:
- coveralls
6 changes: 5 additions & 1 deletion moi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
# -----------------------------------------------------------------------------
import os

from tornado.ioloop import IOLoop
from redis import Redis
from future import standard_library
with standard_library.hooks():
from configparser import ConfigParser

from moi.context import Context
from moi.pubsub import PubSub


REDIS_KEY_TIMEOUT = 84600 * 14 # two weeks
Expand All @@ -30,11 +32,13 @@
password=_config.get('redis', 'password'),
db=_config.get('redis', 'db'))

# Instantiate the pubsub object
pubsub = PubSub(r_client, IOLoop.instance())

# setup contexts
ctxs = {name: Context(name)
for name in _config.get('ipython', 'context').split(',')}
ctx_default = _config.get('ipython', 'default')

__version__ = '0.1.0-dev'
__all__ = ['r_client', 'ctxs', 'ctx_default', 'REDIS_KEY_TIMEOUT']
__all__ = ['r_client', 'ctxs', 'ctx_default', 'REDIS_KEY_TIMEOUT', 'pubsub']
38 changes: 23 additions & 15 deletions moi/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

from uuid import uuid4

import toredis
from redis import ResponseError
from tornado.escape import json_decode, json_encode

from moi import r_client, ctx_default
from moi import r_client, ctx_default, pubsub

_children_key = lambda x: x + ':children'
_pubsub_key = lambda x: x + ':pubsub'
Expand All @@ -32,14 +31,15 @@ class Group(object):
`dict`. Any return is ignored.
"""
def __init__(self, group, forwarder=None):
self.toredis = toredis.Client()
self.toredis.connect()

# Keep track of the channels that we are listening along with the
# function that we will need to execute to unsubscribe from the channel
# Format: {pubsub_key: (id, function)}
self._listening_to = {}

self.group = group
self.group_children = _children_key(group)
self.group_pubsub = _pubsub_key(group)
self.group_unsubscriber = None

if forwarder is None:
self.forwarder = lambda x: None
Expand Down Expand Up @@ -70,9 +70,10 @@ def __del__(self):

def close(self):
"""Unsubscribe the group and all jobs being listened too"""
for channel in self._listening_to:
self.toredis.unsubscribe(channel)
self.toredis.unsubscribe(self.group_pubsub)
for _, unsubscriber in self._listening_to.values():
unsubscriber()

self.group_unsubscriber()

def _decode(self, data):
try:
Expand All @@ -83,19 +84,21 @@ def _decode(self, data):
@property
def jobs(self):
"""Get the known job IDs"""
return self._listening_to.values()
return [val[0] for val in self._listening_to.values()]

def listen_for_updates(self):
"""Attach a callback on the group pubsub"""
self.toredis.subscribe(self.group_pubsub, callback=self.callback)
self.group_unsubscriber = pubsub.subscribe(self.group_pubsub,
self.callback)

def listen_to_node(self, id_):
"""Attach a callback on the job pubsub if it exists"""
if r_client.get(id_) is None:
return
else:
self.toredis.subscribe(_pubsub_key(id_), callback=self.callback)
self._listening_to[_pubsub_key(id_)] = id_
key = _pubsub_key(id_)
unsubscriber = pubsub.subscribe(key, callback=self.callback)
self._listening_to[key] = (id_, unsubscriber)
return id_

def unlisten_to_node(self, id_):
Expand All @@ -114,8 +117,9 @@ def unlisten_to_node(self, id_):
id_pubsub = _pubsub_key(id_)

if id_pubsub in self._listening_to:
_, unsubcriber = self._listening_to[id_pubsub]
del self._listening_to[id_pubsub]
self.toredis.unsubscribe(id_pubsub)
unsubcriber()

parent = json_decode(r_client.get(id_)).get('parent', None)
if parent is not None:
Expand Down Expand Up @@ -282,7 +286,8 @@ def _action_get(self, ids):
ids = self.jobs
result = []

ids = set(ids)
ids_seen = set()
ids = list(ids)
while ids:
id_ = ids.pop()

Expand All @@ -305,7 +310,10 @@ def _action_get(self, ids):

if payload['type'] == 'group':
for obj in self._traverse(id_):
ids.add(obj['id'])
id_ = obj['id']
if id_ not in ids_seen:
ids.add(id_)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A list doesn't have an add method...

ids_seen.add(id_)

return result

Expand Down
2 changes: 1 addition & 1 deletion moi/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _status_change(id, new_status):
str
The old status
"""
job_info = json.loads(r_client.get(id))
job_info = json.loads(r_client.get(id).decode('utf-8'))
old_status = job_info['status']
job_info['status'] = new_status
_deposit_payload(job_info)
Expand Down
86 changes: 86 additions & 0 deletions moi/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
r"""Pub-sub communication"""

# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from tornado.ioloop import PeriodicCallback

from collections import defaultdict


class PubSub(object):
def __init__(self, r_client, tornado_ioloop):
self.channel_listeners = defaultdict(list)
self._pubsub = r_client.pubsub(ignore_subscribe_messages=True)
self._callback = PeriodicCallback(self.get_message, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute the callback every 1ms? That's pretty aggressive. Was it not possible to use a handler (ioloop.add_handler)? The advantage is that the handler would be reactive, whereas the periodic callback is actually just polling.

self._callback.start()

def __del__(self):
# Tell tornado to stop asking as
self._callback.stop()
for key in self.channel_listeners:
self._pubsub.unsubscribe(key)
self._pubsub.close()

def subscribe(self, channel, callback):
"""Subscribe a callback to a channel

Parameters
----------
channel : str
The channel to register the callback
callback : function
The function to be registered

Returns
-------
function
The function to call in order to unsubscribe from the channel
"""
# If the current object was not already listening to the channel,
# subscribe to the channel
if channel not in self.channel_listeners:
self._pubsub.subscribe(channel)

# Add the callback to the channel's listener list
self.channel_listeners[channel].append(callback)

# Create the function used to unsubscribe the callback
def destructor():
self.channel_listeners[channel].remove(callback)
# Do some clean-up: if there is no body listen to a channel
# unsubscribe the object from it
if len(self.channel_listeners[channel]) == 0:
self._pubsub.unsubscribe(channel)

# Return the unsubscribe function
return destructor

def get_message(self):
"""Callback for the tornado's IOLoop"""
# Get a message (if exists)
message = self._pubsub.get_message()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility of contention or the message box filling up? In other words, if two processes issue a publish at the same time, will only one message be received? That would explain the fine grained polling interval, but does not protect from this likely case especially when scaling out to 100s of processes

if message:
# There is a message! Execute all the callback functions
self._notify(message)

def _notify(self, message):
"""Notify the message to all it's callback functions

Parameters
----------
message : dict
The message received
"""
# Get the channel of the message
channel = message['channel']
# Get the actual data
data = message['data']
# Call all the callback functions passing the data received
for callback in self.channel_listeners[channel]:
callback(data)
Empty file added moi/test/__init__.py
Empty file.
55 changes: 32 additions & 23 deletions moi/test/test_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,22 @@ def test_listen_for_updates(self):
pass # nothing to test...

def test_listen_to_node(self):
self.assertEqual(sorted(self.obj._listening_to.items()),
[('a:pubsub', 'a'),
('b:pubsub', 'b'),
('c:pubsub', 'c')])
exp = {'a:pubsub': 'a',
'b:pubsub': 'b',
'c:pubsub': 'c'}
self.assertEqual(sorted(self.obj._listening_to.keys()),
sorted(exp.keys()))
for key in self.obj._listening_to:
self.assertEqual(self.obj._listening_to[key][0], exp[key])

def test_unlisten_to_node(self):
self.assertEqual(self.obj.unlisten_to_node('b'), 'b')
self.assertEqual(sorted(self.obj._listening_to.items()),
[('a:pubsub', 'a'),
('c:pubsub', 'c')])
exp = {'a:pubsub': 'a',
'c:pubsub': 'c'}
self.assertEqual(sorted(self.obj._listening_to.keys()),
sorted(exp.keys()))
for key in self.obj._listening_to:
self.assertEqual(self.obj._listening_to[key][0], exp[key])
self.assertEqual(self.obj.unlisten_to_node('foo'), None)

def test_callback(self):
Expand Down Expand Up @@ -99,15 +105,18 @@ def __call__(self, data):
self.obj.forwarder = fwd

self.obj.action('add', ['d', 'e'])
self.assertEqual(fwd.result, [

exp = [
{'add': {u'id': u'd', u'name': u'other job', u'type': u'job'}},
{'add': {u'id': u'e', u'name': u'other job e', u'type': u'job'}}])
{'add': {u'id': u'e', u'name': u'other job e', u'type': u'job'}}]

self.assertCountEqual(fwd.result, exp)
self.obj.action('remove', ['e', 'd'])
self.assertEqual(fwd.result, [
{'remove':
{u'id': u'e', u'name': u'other job e', u'type': u'job'}},
{u'id': u'd', u'name': u'other job', u'type': u'job'}},
{'remove':
{u'id': u'd', u'name': u'other job', u'type': u'job'}}])
{u'id': u'e', u'name': u'other job e', u'type': u'job'}}])
self.obj.action('remove', ['d'])
self.assertEqual(fwd.result, [])

Expand All @@ -129,11 +138,11 @@ def __call__(self, data):
self.obj.forwarder = fwd

self.obj.job_action('update', ['a', 'b'])
self.assertEqual(fwd.result, [{'update': {u'id': u'a',
u'name': u'a',
u'type': u'job'}},
{'update': {u'id': u'b',
self.assertEqual(fwd.result, [{'update': {u'id': u'b',
u'name': u'b',
u'type': u'job'}},
{'update': {u'id': u'a',
u'name': u'a',
u'type': u'job'}}])

with self.assertRaises(TypeError):
Expand All @@ -145,8 +154,8 @@ def __call__(self, data):
def test_action_add(self):
resp = self.obj._action_add(['d', 'f', 'e'])
self.assertEqual(resp, [
{u'id': u'd', u'name': u'other job', u'type': u'job'},
{u'id': u'e', u'name': u'other job e', u'type': u'job'}])
{u'id': u'e', u'name': u'other job e', u'type': u'job'},
{u'id': u'd', u'name': u'other job', u'type': u'job'}])
self.assertIn('d:pubsub', self.obj._listening_to)
self.assertIn('e:pubsub', self.obj._listening_to)
self.assertNotIn('f:pubsub', self.obj._listening_to)
Expand All @@ -155,23 +164,23 @@ def test_action_remove(self):
self.obj._action_add(['d', 'f', 'e'])
resp = self.obj._action_remove(['a', 'd', 'f', 'c', 'e'])
self.assertEqual(resp, [
{u'id': u'a', u'name': u'a', u'type': u'job'},
{u'id': u'd', u'name': u'other job', u'type': u'job'},
{u'id': u'e', u'name': u'other job e', u'type': u'job'},
{u'id': u'c', u'name': u'c', u'type': u'job'},
{u'id': u'e', u'name': u'other job e', u'type': u'job'}])
{u'id': u'd', u'name': u'other job', u'type': u'job'},
{u'id': u'a', u'name': u'a', u'type': u'job'}])

self.assertNotIn('a:pubsub', self.obj._listening_to)
self.assertNotIn('c:pubsub', self.obj._listening_to)
self.assertNotIn('d:pubsub', self.obj._listening_to)
self.assertNotIn('e:pubsub', self.obj._listening_to)
self.assertNotIn('f:pubsub', self.obj._listening_to)
self.assertEqual(r_client.smembers('testing:children'), {'b'})
self.assertEqual(r_client.smembers('testing:children'), {b'b'})

def test_action_get(self):
resp = self.obj._action_get(['d', 'f', 'e', None])
self.assertEqual(resp, [
{u'id': u'd', u'name': u'other job', u'type': u'job'},
{u'id': u'e', u'name': u'other job e', u'type': u'job'}])
{u'id': u'e', u'name': u'other job e', u'type': u'job'},
{u'id': u'd', u'name': u'other job', u'type': u'job'}])


if __name__ == '__main__':
Expand Down
Loading