Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outgoing phases, default responses #2

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
975 changes: 975 additions & 0 deletions rapidsms_httprouter/managers.py

Large diffs are not rendered by default.

33 changes: 16 additions & 17 deletions rapidsms_httprouter/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import datetime

from django.db import models, connections
from django.db.models.query import QuerySet
from django.db import models

from rapidsms.models import Contact, Connection

from .managers import ForUpdateManager, BulkInsertManager

DIRECTION_CHOICES = (
("I", "Incoming"),
("O", "Outgoing"))
Expand All @@ -31,29 +31,19 @@
# See: https://coderanger.net/2011/01/select-for-update/
#

class ForUpdateQuerySet(QuerySet):
def for_single_update(self):
if 'sqlite' in connections[self.db].settings_dict['ENGINE'].lower():
# Noop on SQLite since it doesn't support FOR UPDATE
return self
sql, params = self.query.get_compiler(self.db).as_sql()
return self.model._default_manager.raw(sql.rstrip() + ' LIMIT 1 FOR UPDATE', params)

class ForUpdateManager(models.Manager):
def get_query_set(self):
return ForUpdateQuerySet(self.model, using=self._db)

class Message(models.Model):
connection = models.ForeignKey(Connection, related_name='messages')
text = models.TextField()
direction = models.CharField(max_length=1, choices=DIRECTION_CHOICES)
status = models.CharField(max_length=1, choices=STATUS_CHOICES)
date = models.DateTimeField(auto_now_add=True)

in_response_to = models.ForeignKey('self', related_name='responses', null=True, blank=True)
in_response_to = models.ForeignKey('self', related_name='responses', null=True)
application = models.CharField(max_length=100, null=True)

# set our manager to our update manager
objects = ForUpdateManager()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm missing how the above functionality has been replaced. How does a multi worker gunicorn config work now without dupe messages?

bulk = BulkInsertManager()

def __unicode__(self):
# crop the text (to avoid exploding the admin)
Expand All @@ -69,4 +59,13 @@ def as_json(self):
direction=self.direction, status=self.status, text=self.text,
date=self.date.isoformat())


@classmethod
def mass_text(cls, text, connections, status='P'):
for connection in connections:
Message.bulk.bulk_insert(
send_pre_save=False,
text=text,
direction='O',
status=status,
connection=connection)
return Message.bulk.bulk_insert_commit(send_post_save=False, autoclobber=True)
67 changes: 43 additions & 24 deletions rapidsms_httprouter/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ def run(self):
# if it wasn't cancelled, send it off
if send_msg:
self.send_message(outgoing_message)
self._isbusy = False
time.sleep(getattr(settings, 'WORKER_SLEEP_SHORT', 0.25))
else:
# if there weren't any messages queued, back off
# from polling the DB
self._isbusy = False
time.sleep(getattr(settings, 'WORKER_SLEEP_LONG',1))

except:
import traceback
traceback.print_exc()
Expand Down Expand Up @@ -282,6 +290,10 @@ def handle_incoming(self, backend, sender, text):
# mark the message handled to avoid the
# default phase firing unnecessarily
msg.handled = True
outgoing_db_lock.acquire()
db_message.application = app
db_message.save()
outgoing_db_lock.release()
break

elif phase == "default":
Expand All @@ -304,16 +316,15 @@ def handle_incoming(self, backend, sender, text):
# now send the message responses
while msg.responses:
response = msg.responses.pop(0)
self.handle_outgoing(response, db_message)
self.handle_outgoing(response, db_message, db_message.application)

# we are no longer interested in this message... but some crazy
# synchronous backends might be, so mark it as processed.
msg.processed = True

return db_message


def add_outgoing(self, connection, text, source=None, status='Q'):
def add_outgoing(self, connection, text, source=None, status='Q', application=None):
"""
Adds a message to our outgoing queue, this is a non-blocking action
"""
Expand All @@ -323,13 +334,12 @@ def add_outgoing(self, connection, text, source=None, status='Q'):
text=text,
direction='O',
status=status,
in_response_to=source)
in_response_to=source,
application=application)
outgoing_db_lock.release()

self.info("SMS[%d] OUT (%s) : %s" % (db_message.id, str(connection), text))

global outgoing_worker_threads

# if we have no ROUTER_URL configured, then immediately process our outgoing phases
# and leave the message in the queue
if not getattr(settings, 'ROUTER_URL', None):
Expand All @@ -342,28 +352,33 @@ def add_outgoing(self, connection, text, source=None, status='Q'):
# otherwise, fire up any threads we need to send the message out
else:
# check for available worker threads in the pool, add one if necessary
num_workers = getattr(settings, 'ROUTER_WORKERS', 5)
all_busy = True
for worker in outgoing_worker_threads:
if not worker.is_busy():
all_busy = False
break

if all_busy and len(outgoing_worker_threads) < num_workers:
worker = HttpRouterThread()
worker.daemon = True # they don't need to quit gracefully
worker.start()
outgoing_worker_threads.append(worker)
self.check_workers()

return db_message

def handle_outgoing(self, msg, source=None):

def check_workers(self):
global outgoing_worker_threads
# check for available worker threads in the pool, add one if necessary
num_workers = getattr(settings, 'ROUTER_WORKERS', 5)
all_busy = True
for worker in outgoing_worker_threads:
if not worker.is_busy():
all_busy = False
break

if all_busy and len(outgoing_worker_threads) < num_workers:
worker = HttpRouterThread()
worker.daemon = True # they don't need to quit gracefully
worker.start()
outgoing_worker_threads.append(worker)

def handle_outgoing(self, msg, source=None, application=None):
"""
Sends the passed in RapidSMS message off. Optionally ties the outgoing message to the incoming
message which triggered it.
"""
# add it to our outgoing queue
db_message = self.add_outgoing(msg.connection, msg.text, source, status='P')
db_message = self.add_outgoing(msg.connection, msg.text, source, status='P', application=application)

return db_message

Expand Down Expand Up @@ -436,7 +451,7 @@ def add_app(self, module_name):
return app


def start(self):
def start(self, start_workers=False):
"""
Initializes our router.
TODO: this happens in the HTTP thread on the first call, that could be bad.
Expand All @@ -453,14 +468,18 @@ def start(self):
# upon first starting up
self.outgoing = [message for message in Message.objects.filter(status='Q')]

# kick start one worker
if start_workers:
self.check_workers()

# mark ourselves as started
self.started = True

# we'll get started when we first get used
http_router = HttpRouter()
http_router_lock = Lock()

def get_router():
def get_router(start_workers=False):
"""
Takes care of performing lazy initialization of the www router.
"""
Expand All @@ -471,7 +490,7 @@ def get_router():
http_router_lock.acquire()
try:
if not http_router.started:
http_router.start()
http_router.start(start_workers)
finally:
http_router_lock.release()

Expand Down
66 changes: 66 additions & 0 deletions rapidsms_httprouter/templates/router/summary.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{% extends "layout.html" %}

{% block title %}Message Report - {{ block.super }}{% endblock %}

{% block content %}
<div class="module">
<h2>Message Summary</h2>
<table>
<thead>
<tr>
<td rowspan="2">Month</td>
<td rowspan="2">Year</td>
{% regroup messages|dictsort:"connection__backend__name" by connection__backend__name as link_list %}
{% for backend in link_list %}
<td colspan="2">{{ backend.grouper }}</td>
{% endfor %}
</tr>
<tr>
{% for backend in link_list %}
<td>Incoming</td>
<td>Outgoing</td>
{% endfor %}
</tr>
</thead>
<tbody>
{% regroup messages by year as year_list %}
{% for year in year_list %}
{% regroup year.list by month as month_list %}
{% for month in month_list %}
<tr>
<td>{{ month.grouper|floatformat }}</td>
<td>{{ year.grouper|floatformat }}</td>
{% regroup month.list by connection__backend__name as mlink_list %}
{% for backend in link_list %}
{% for local_backend in mlink_list %}
{% ifequal backend.grouper local_backend.grouper %}
{% comment %}Filthy hack for setting a variable{% endcomment %}
{% regroup local_backend.list by direction as foundit %}
{% if local_backend.list.0.direction == 'I' %}
<td>{{ local_backend.list.0.total }}</td>
{% if local_backend.list.1.direction == 'O' %}
<td>{{ local_backend.list.1.total }}</td>
{% else %}
<td>0</td>
{% endif %}
{% else %}
<td>0</td>
<td>{{ local_backend.list.0.total }}</td>
{% endif %}
{% endifequal %}
{% if forloop.last %}
{% if not foundit %}
<td>0</td>
<td>0</td>
{% endif %}
{% endif %}
{% endfor %}

{% endfor %}
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
</div>
{% endblock %}
22 changes: 22 additions & 0 deletions rapidsms_httprouter/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,28 @@ def testAddMessage(self):
msg4 = router.add_message('test', 'asdfASDF', 'test', 'I', 'P')
self.assertEquals('asdfasdf', msg4.connection.identity)

def testAddBulk(self):
connection2 = Connection.objects.create(backend=self.backend, identity='8675309')
connection3 = Connection.objects.create(backend=self.backend, identity='8675310')
connection4 = Connection.objects.create(backend=self.backend, identity='8675311')

# test that mass texting works with a single number
msgs = Message.mass_text('Jenny I got your number', [self.connection])

self.assertEquals(msgs.count(), 1)
self.assertEquals(msgs[0].text, 'Jenny I got your number')

# test no connections are re-created
self.assertEquals(msgs[0].connection.pk, self.connection.pk)

msgs = Message.mass_text('Jenny dont change your number', [self.connection,connection2,connection3,connection4],status='L')
self.assertEquals(str(msgs.values_list('status', flat=True).distinct()[0]), 'L')
self.assertEquals(msgs.count(), 4)

# test duplicate connections don't create duplicate messages
msgs = Message.mass_text('Turbo King is the greatest!', [self.connection,self.connection])
self.assertEquals(msgs.count(), 1)

def testRouter(self):
router = get_router()

Expand Down
5 changes: 3 additions & 2 deletions rapidsms_httprouter/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# vim: ai ts=4 sts=4 et sw=4

from django.conf.urls.defaults import *
from .views import receive, outbox, delivered, console
from .views import receive, outbox, delivered, console, summary
from django.contrib.admin.views.decorators import staff_member_required

urlpatterns = patterns("",
("^router/receive", receive),
("^router/outbox", outbox),
("^router/delivered", delivered),
("^router/console", staff_member_required(console), {}, 'httprouter-console')
("^router/console", staff_member_required(console), {}, 'httprouter-console'),
("^router/summary", summary),
)
13 changes: 13 additions & 0 deletions rapidsms_httprouter/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from django.http import HttpResponse
from django.template import RequestContext
from django.shortcuts import render_to_response
from django.contrib.auth.decorators import login_required
from django.conf import settings;
from django.db.models import Count
from django.db.models import Q
from django.core.paginator import *

Expand Down Expand Up @@ -192,3 +194,14 @@ def console(request):
}, context_instance=RequestContext(request)
)

@login_required
def summary(request):
messages = Message.objects.extra(
{'year':'extract(year from date)',
'month':'extract (month from date)'})\
.values('year','month','connection__backend__name','direction')\
.annotate(total=Count('id'))\
.extra(order_by=['year','month','connection__backend__name','direction'])
return render_to_response(
"router/summary.html",
{ 'messages': messages},context_instance=RequestContext(request))