-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove toredis #15
base: master
Are you sure you want to change the base?
Remove toredis #15
Changes from all commits
2ecc84b
78293c1
0700b6d
10d023e
5e18f08
8f65473
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
language: python | ||
env: | ||
- PYTHON_VERSION=2.7 | ||
- PYTHON_VERSION=3.4 | ||
before_install: | ||
- wget http://repo.continuum.io/miniconda/Miniconda3-3.7.3-Linux-x86_64.sh -O miniconda.sh | ||
- chmod +x miniconda.sh | ||
- ./miniconda.sh -b | ||
- export PATH=/home/travis/miniconda3/bin:$PATH | ||
# Update conda itself | ||
- conda update --yes conda | ||
install: | ||
- conda create --yes -n env_name python=$PYTHON_VERSION pip nose flake8 | ||
- source activate env_name | ||
- pip install coveralls | ||
- pip install . | ||
script: | ||
- export MOI_CONFIG_FP=`pwd`/moi_config.txt | ||
- ipython profile create moi_profile --parallel | ||
- ipcluster -n 2 --profile moi_profile & | ||
- nosetest --with-coverage | ||
- flake8 moi setup.py | ||
services: | ||
- redis-server | ||
after_success: | ||
- coveralls |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,11 +10,10 @@ | |
|
||
from uuid import uuid4 | ||
|
||
import toredis | ||
from redis import ResponseError | ||
from tornado.escape import json_decode, json_encode | ||
|
||
from moi import r_client, ctx_default | ||
from moi import r_client, ctx_default, pubsub | ||
|
||
_children_key = lambda x: x + ':children' | ||
_pubsub_key = lambda x: x + ':pubsub' | ||
|
@@ -32,14 +31,15 @@ class Group(object): | |
`dict`. Any return is ignored. | ||
""" | ||
def __init__(self, group, forwarder=None): | ||
self.toredis = toredis.Client() | ||
self.toredis.connect() | ||
|
||
# Keep track of the channels that we are listening along with the | ||
# function that we will need to execute to unsubscribe from the channel | ||
# Format: {pubsub_key: (id, function)} | ||
self._listening_to = {} | ||
|
||
self.group = group | ||
self.group_children = _children_key(group) | ||
self.group_pubsub = _pubsub_key(group) | ||
self.group_unsubscriber = None | ||
|
||
if forwarder is None: | ||
self.forwarder = lambda x: None | ||
|
@@ -70,9 +70,10 @@ def __del__(self): | |
|
||
def close(self): | ||
"""Unsubscribe the group and all jobs being listened too""" | ||
for channel in self._listening_to: | ||
self.toredis.unsubscribe(channel) | ||
self.toredis.unsubscribe(self.group_pubsub) | ||
for _, unsubscriber in self._listening_to.values(): | ||
unsubscriber() | ||
|
||
self.group_unsubscriber() | ||
|
||
def _decode(self, data): | ||
try: | ||
|
@@ -83,19 +84,21 @@ def _decode(self, data): | |
@property | ||
def jobs(self): | ||
"""Get the known job IDs""" | ||
return self._listening_to.values() | ||
return [val[0] for val in self._listening_to.values()] | ||
|
||
def listen_for_updates(self): | ||
"""Attach a callback on the group pubsub""" | ||
self.toredis.subscribe(self.group_pubsub, callback=self.callback) | ||
self.group_unsubscriber = pubsub.subscribe(self.group_pubsub, | ||
self.callback) | ||
|
||
def listen_to_node(self, id_): | ||
"""Attach a callback on the job pubsub if it exists""" | ||
if r_client.get(id_) is None: | ||
return | ||
else: | ||
self.toredis.subscribe(_pubsub_key(id_), callback=self.callback) | ||
self._listening_to[_pubsub_key(id_)] = id_ | ||
key = _pubsub_key(id_) | ||
unsubscriber = pubsub.subscribe(key, callback=self.callback) | ||
self._listening_to[key] = (id_, unsubscriber) | ||
return id_ | ||
|
||
def unlisten_to_node(self, id_): | ||
|
@@ -114,8 +117,9 @@ def unlisten_to_node(self, id_): | |
id_pubsub = _pubsub_key(id_) | ||
|
||
if id_pubsub in self._listening_to: | ||
_, unsubcriber = self._listening_to[id_pubsub] | ||
del self._listening_to[id_pubsub] | ||
self.toredis.unsubscribe(id_pubsub) | ||
unsubcriber() | ||
|
||
parent = json_decode(r_client.get(id_)).get('parent', None) | ||
if parent is not None: | ||
|
@@ -282,7 +286,8 @@ def _action_get(self, ids): | |
ids = self.jobs | ||
result = [] | ||
|
||
ids = set(ids) | ||
ids_seen = set() | ||
ids = list(ids) | ||
while ids: | ||
id_ = ids.pop() | ||
|
||
|
@@ -305,7 +310,10 @@ def _action_get(self, ids): | |
|
||
if payload['type'] == 'group': | ||
for obj in self._traverse(id_): | ||
ids.add(obj['id']) | ||
id_ = obj['id'] | ||
if id_ not in ids_seen: | ||
ids.add(id_) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A |
||
ids_seen.add(id_) | ||
|
||
return result | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
r"""Pub-sub communication""" | ||
|
||
# ----------------------------------------------------------------------------- | ||
# Copyright (c) 2014--, The qiita Development Team. | ||
# | ||
# Distributed under the terms of the BSD 3-clause License. | ||
# | ||
# The full license is in the file LICENSE, distributed with this software. | ||
# ----------------------------------------------------------------------------- | ||
|
||
from tornado.ioloop import PeriodicCallback | ||
|
||
from collections import defaultdict | ||
|
||
|
||
class PubSub(object): | ||
def __init__(self, r_client, tornado_ioloop): | ||
self.channel_listeners = defaultdict(list) | ||
self._pubsub = r_client.pubsub(ignore_subscribe_messages=True) | ||
self._callback = PeriodicCallback(self.get_message, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Execute the callback every 1ms? That's pretty aggressive. Was it not possible to use a handler ( |
||
self._callback.start() | ||
|
||
def __del__(self): | ||
# Tell tornado to stop asking as | ||
self._callback.stop() | ||
for key in self.channel_listeners: | ||
self._pubsub.unsubscribe(key) | ||
self._pubsub.close() | ||
|
||
def subscribe(self, channel, callback): | ||
"""Subscribe a callback to a channel | ||
|
||
Parameters | ||
---------- | ||
channel : str | ||
The channel to register the callback | ||
callback : function | ||
The function to be registered | ||
|
||
Returns | ||
------- | ||
function | ||
The function to call in order to unsubscribe from the channel | ||
""" | ||
# If the current object was not already listening to the channel, | ||
# subscribe to the channel | ||
if channel not in self.channel_listeners: | ||
self._pubsub.subscribe(channel) | ||
|
||
# Add the callback to the channel's listener list | ||
self.channel_listeners[channel].append(callback) | ||
|
||
# Create the function used to unsubscribe the callback | ||
def destructor(): | ||
self.channel_listeners[channel].remove(callback) | ||
# Do some clean-up: if there is no body listen to a channel | ||
# unsubscribe the object from it | ||
if len(self.channel_listeners[channel]) == 0: | ||
self._pubsub.unsubscribe(channel) | ||
|
||
# Return the unsubscribe function | ||
return destructor | ||
|
||
def get_message(self): | ||
"""Callback for the tornado's IOLoop""" | ||
# Get a message (if exists) | ||
message = self._pubsub.get_message() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a possibility of contention or the message box filling up? In other words, if two processes issue a publish at the same time, will only one message be received? That would explain the fine grained polling interval, but does not protect from this likely case especially when scaling out to 100s of processes |
||
if message: | ||
# There is a message! Execute all the callback functions | ||
self._notify(message) | ||
|
||
def _notify(self, message): | ||
"""Notify the message to all it's callback functions | ||
|
||
Parameters | ||
---------- | ||
message : dict | ||
The message received | ||
""" | ||
# Get the channel of the message | ||
channel = message['channel'] | ||
# Get the actual data | ||
data = message['data'] | ||
# Call all the callback functions passing the data received | ||
for callback in self.channel_listeners[channel]: | ||
callback(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome... thank you for getting travis up