diff --git a/rapidsms_httprouter/managers.py b/rapidsms_httprouter/managers.py new file mode 100644 index 0000000..0146bf5 --- /dev/null +++ b/rapidsms_httprouter/managers.py @@ -0,0 +1,983 @@ +# This Python file uses the following encoding: utf-8 +import os, sys +import datetime +from tempfile import mkstemp +from django.conf import settings +from django.db import models, connection, connections +from django.db.models import signals +from django.db.models.fields import AutoField, DateTimeField, DateField, TimeField, FieldDoesNotExist +from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField +from django.db.models.query import QuerySet + +class ForUpdateQuerySet(QuerySet): + def for_single_update(self): + if 'sqlite' in connections[self.db].settings_dict['ENGINE'].lower(): + # Noop on SQLite since it doesn't support FOR UPDATE + return self + sql, params = self.query.get_compiler(self.db).as_sql() + return self.model._default_manager.raw(sql.rstrip() + ' LIMIT 1 FOR UPDATE', params) + +class ForUpdateManager(models.Manager): + def get_query_set(self): + return ForUpdateQuerySet(self.model, using=self._db) + +def hash_dict(dictionary): + return hash(frozenset(dictionary.items())) + +class BulkInsertManager(models.Manager): + """ BulkInsertManager Class + contact kgoudeaux@gmail.com + + Public Interface: + bulk_insert(self, now=None, raw=False, send_pre_save=True, **kwargs): + bulk_insert_commit(self, now=None, autoclobber=False, max_depth=5, send_post_save=True, **kwargs) + bulk_insert_now(self, now=None) + + Update 5-9-2008 Added support for self references for Foreign Keys, + OneToOne Fields and Many to Many relations + max_depth default changed to 5 + + Tested on Django SVN 7519 + Currently, only MySQL is supported although a crude framework is there to support others + + Bulk insert objects to a database including Foreign Key, One to One and Many to Many + relationships. + + You can also create self referential relationships of each type including (non)symmetrical + Many to Many relationships. + + Insert an object by calling bulk_insert(kwargs) where kwargs is what you would specify + to create an instance of the underlying model i.e. Model(kwargs). + + Field names in kwargs can also include any related names declared on other models. + + IMPORTANT: Internally, because no primary keys are known until the objects are inserted, + each object is identified by its "kwargs signature" which is a hash of the field names + and values given to bulk_insert combined with any pre_computed default values. + + If you want to insert the _exact_ same data in a bunch of rows, this won't help you. + Objects with the same signature will be treated as if they are the same + object. + + If you specify your own primary key values when the primary key is an AutoField, + your values will be ignored + + ##################### + # Example # + ##################### + Given these three classes: + + class Author(models.Model): + first_name = models.CharField(blank=True, max_length=100) + last_name = models.CharField(blank=True, max_length=100) + + class Article(models.Model): + author = models.ForeignKey(Author, null=True) + text = models.CharField(blank=True, max_length=100, default='') + objects = BulkInsert() + + class Note(models.Model): + article = models.ForeignKey(Article, related_name='notes') + text = models.CharField(blank=True, max_length=100) + + class Character(models.Model): + char = models.CharField(blank=True, max_length=1) + words = models.ManyToManyField(Article, related_name='characters') + + 1. Article.objects.bulk_insert( text="this is gibberish", + author={'first_name':'John', 'last_name':'Smith} + notes=[{'text':'very long'}, {'text':'very short'}], + characters=[{'chars':'t'}, {'chars':'h'}, {'chars':'i'}, {'chars':'s'}, + {'chars':' '}, {'chars':'i'}, {'chars':'s'}, {'chars':' '}, + {'chars':'g'}, {'chars':'i'}, {'chars':'b'}, {'chars':'b'}, + {'chars':'e'}, {'chars':'r'}, {'chars':'i'}, {'chars':'s'}, + {'chars':'h'}]) + 2. Article.objects.bulk_insert( text="this is gibberish", + author={'first_name':'John', 'last_name':'Smith}) + + 3. Article.objects.bulk_insert( text="this is gibberish", + author={'first_name':'John', 'last_name':'Smith}, + notes=Note(text="just right")) + + Article.objects.bulk_insert_commit() + + On commit, the effect of the first call will be to first bulk insert an Author object and recover its primary key. + Then it will insert the Article object referencing the Author object and recover the Article's primary key + Next it will insert two notes and associate them with the Article + Finally it will bulk insert all of the characters and establish each character's many_to_many + relationship with the Article + + Note that there are duplicate Character objects being added, the duplicates will all be treated as + the same object as their "kwargs signatures" are identical + + Also note that the second call to bulk insert will NOT create another article object because its + kwargs signature matches the first call. Only fields stored in the Article's table are used to compute + the signature + + The third call to bulk_insert will add an additional note. Notice that it's a Note object + with an unset primary key. It will be converted into a set of kwargs and bulk inserted accordingly. + If its primary key were set, the key would be extracted to set the relationships but the object would + not be re-inserted + + For this example, the pre_save signal will be called *three* times for the Article object and post_save + will be called once + + ##################### + # Real Example # + ##################### + Saving related search terms from the Yahoo.jp API in a general + WebQuery table. A maximum of 100 related terms are retrieved at a time + + class WebQuery(models.Model): + query = models.CharField(blank=False, null=False, max_length=256) + query_type = models.IntegerField(blank=False, null=False, choices=QUERY_TYPES) + ... + class Meta: + unique_together = ("query", "query_type") + + class RelatedSearch(models.Model): + original = models.CharField(blank=True, max_length=100, unique=True) + queries = models.ManyToManyField(WebQuery, related_name='searches') + ... + + objects = BulkInsert() + + Without bulk insert + ------------------- + queries = [] + r = RelatedSearch.objects.get_or_create(original=original_query) + for q in get_related_searches(original_query): + webquery = WebQuery.objects.get_or_create(query=q, query_type=YAHOOJP_RELATED) + queries.append(webquery.id) + r.queries.add(*queries) + + Worst Case: 101 objects are inserted using 204 db queries + + With Bulk Insert + ---------------- + queries = [] + for q in get_related_searches(original_query): + queries.append({'query': q, 'query_type': YAHOOJP_RELATED}) + RelatedSearch.objects.bulk_insert(original=original_query, queries=queries) + RelatedSearch.objects.bulk_insert_commit() + + Worst Case: 101 objects are inserted with 6 db queries + + ##################### + # Notes # + ##################### + Arguments to bulk_insert can include the underlying models fields, of course, but also + any related name specified by a related class. In other words you can treat relationships + defined on other models as "fields" on this model, similar to QuerySets. + + If the object you wish to specify a relationship with doesn't exist, simply provide a + dict of arguments or a model instance with an unset primary key and it will be queued and + bulk inserted accordingly. + + You don't need to add BulkInsert as a manager to each class you wish to perform bulk + inserts with - just the class you want to perform bulk inserts _relative to_. + + If, when bulk inserting a list of related objects, it cannot find a BulkInsert manager, + it will add one to that class and perform the insert + + To execute the actual insert call bulk_insert_commit. This will execute the insert using an INSERT statement. + + For the file insert to work, write permissions are needed in the current working directory + or settings.BULK_INSERT_DIR + + Also, the mysql user needs file priveleges i.e: + GRANT FILE ON *.* TO 'user'@'localhost'; + Note that for MySQL, File permissions can only be added globally + + Regardless of the number of objects and type of relationships, this will make + 1 database call to perform the insert per table affected and N/100 calls per table + to recover primary keys where N is the number of rows to be inserted + + Generic Foreign Keys Partially Supported - Need a fix like Ticket #5826 + Currently generic foreign keys can only be set by manually specifying the content_type and object_id + + Whereas on a model instance with a GFK, you can do something like: + instance.element = object + instead of: + instance.content_type = ContentType.objects.get_for_model(object.__class__) + instance.object_id = object.id + + There currently isn't a way to recover from the GFK object, the field names that it points to. + So the convenient form of: + Model.objects.bulk_insert(element={'name': 'Jim'}) isn't possible + use: + Create and save Jim object and collect its id + Model.objects.bulk_insert(content_type=ctype, object_id=id) + + ##################### + # Gotchas # + ##################### + -- data types - For MySQL, TEXT and BLOB types cannot be inserted with LOAD DATA INSERT + + -- pre_save signal - It's very possible that pre_save will be called more than once + for the same object when inserting a lot of objects or adding a lot of relationships + + e.g. If you are programmatically generating values and bulk_insert object A. Presave + will be called. If you bulk insert A again with many_to_many relationship B. Presave + will be called again. + + If this is undesirable, specify send_pre_save = False when calling bulk_insert() + + --In a pre_save function, all changes to instance values will be recorded, however, be + careful with computations that produce different results based on time. If pre_save is + called three times for what is nominally the same object and three different values are + set, they will be treated as 3 different objects. + + If this is undesirable, specify send_pre_save = False when calling bulk_insert() + + -- DateTime, Date, and Time fields - If using datetime.now as your default value, + one value for "now" is precomputed and used for all inserts i.e. every date or time + field that uses datetime.now or auto_now or auto_now_add will have the same value + + Think of them as all being inserted at the same instant + + -- Default values that call a function. If you compute a dynamic default value, know + that it will be computed once and cached for all inserted objects + + ##################### + # Self Reference # + ##################### + Self References operate the same as any other reference but there are some powerful + gotchas if you get too creative. This doesn't handle the *general* case of limitless + recursive relationships. If you start seeing warnings and KeyErrors, simplify your inserts. + + Symmetrical is honored for self referencing m2m fields. The direction of relationships can be + bulk inserted by specifiying the m2m field name or the related name. + + In the example below, this is done by setting 'friends' or 'followers' + + Given this class loaded with self references: + class Player(models.Model): + first_name = models.CharField(blank=True, max_length=100) + last_name = models.CharField(blank=True, max_length=100) + + friends = models.ManyToManyField('self', symmetrical=False, related_name='followers', null=True) + favorite = models.ForeignKey('self', related_name='fans', null=True) + antiself = models.OneToOneField('self', related_name='realme', null=True) + + objects = BulkInsert() + + And this set of dicts representing players and their relationships: + other_players = [{'first_name':"Babe", 'last_name':'Ruth', + 'friends': [{'first_name': 'Cal', 'last_name': 'Ripkin'}, + {'first_name': 'Willie', 'last_name': 'Mays'}]}, + {'first_name':"Hank", 'last_name':'Aaron'}] + players = [ {'first_name':'Mark', 'last_name':'McGuire', 'followers':other_players}, + 'antiself': {'first_name': 'Willie', 'last_name': 'Mays'}, + {'first_name':'Sammy', 'last_name':'Sosa', 'favorite':{'first_name':'Hank', 'last_name':'Aaron'}}, + {'first_name':'Johnny', 'last_name':'Bench', 'antiself':{'first_name':'Hank', 'last_name':'Aaron'}}, + {'first_name': 'Mickey', 'last_name': 'Mantle', 'friends': other_players}] + + for player in players: + GibberishAuthor.objects.bulk_insert(**player) + GibberishAuthor.objects.bulk_insert_commit() + + This is a contorted example that pushes the limits but in its unnecessary complexity, shows what can be done. + After commit, a total of *8* unique players will be added: + + Babe Ruth favorite:None antiself:None friends:3 [Cal Ripkin, Willie Mays, Mark McGuire] + Cal Ripkin favorite:None antiself:None friends:0 [] + Willie Mays favorite:None antiself:None friends:0 [] + Hank Aaron favorite:None antiself:None friends:1 [Mark McGuire] + Mark McGuire favorite:None antiself:Willie friends:0 [] + Sammy Sosa favorite:Hank antiself:None friends:0 [] + Johnny Bench favorite:None antiself:Hank friends:0 [] + Mickey Mantle favorite:None antiself:None friends:2 [Babe Ruth, Hank Aaron] + + The first thing to note is that even though some players are referenced more than once, + their kwargs signatures (hash of all non-m2m field values) are identical. + + Be careful with self referential fk and one2one relations that are nested below the first level. + Note that fk's and one2one's were set in 'players' but not in 'other_players'. Inserting fk and + one2one self references breaks the one insert per table design of this manager because fk's and one2one's + must be inserted first. To deeply nest self referential fk's and one2one's, use objects whose primary + key is already set or just provide the primary key value directly. + + To do this on the nested many to many relations above: + other_players = [{'first_name':"Babe", 'last_name':'Ruth', 'friends': [ + {'first_name': 'Cal', 'last_name': 'Ripkin'}, + {'first_name': 'Willie', 'last_name': 'Mays'}], + -------> 'favorite': Player.objects.create(first_name='Bob', last_name='Jones'), + -------> 'antiself': 3}, + {'first_name':"Hank", 'last_name':'Aaron'}] + + If you can, avoid deeply nesting self references or provide model instances with primary keys. + Because the kwargs signature is the only unique identifier, you may unwittingly create two + versions of the same object or worse, it will error out on a KeyError because some distant + self reference changed the kwargs signature of the object. + + You'll probably never need to do something so twisted, but if you do, those are the caveats. + + """ + def __init__(self): + models.Manager.__init__(self) +# super(models.Manager, self).__init__() + self.queue = {} + self.order = {} + #Special Handlers for Self Referencing Objects + self.ref_queue = {} + self.ref_order = {} + self.ref_cache = {} + + self.related_fields = {} + self.related_queue = {} + self.related_classes = {} + + self.m2one_queue = {} + self.m2one_fields = {} + self.m2one_classes = {} + + self.m2m_queue = {} + self.m2m_fields = {} + self.m2m_classes = {} + + self.update_map = {} + self.defaults = {} + + self.initialized = False + self.now = datetime.datetime.now() + self._inherited = False + + def bulk_insert_now(self, now=None): + if now is not None: + self.now = now + else: + self.now = datetime.datetime.now() + + #Some default values may be invalidated by changing 'now' + if self.initialized: + self._collect_field_defaults() + else: + self._related_init() + + def bulk_insert(self, now=None, raw=False, clean_args=False, send_pre_save=True, _self_ref=False, **kwargs): + """ + Hold kwargs in queue until bulk_insert_commit + + All field preprocessing is done unless raw=True + Returns a hash for kwargs if clean_args=True + Primarily for internal use + Sends pre_save signal unless send_pre_save=False + + kwargs can include any field from the underlying model including any related_name's + specified by related models + + """ + if not self.initialized: + #Initialize is delayed until bulk_insert is first called to ensure + #that all relationship hooks have been added to the underlying class + self._related_init() + self.tempModel = self.model() + + if now is not None: + if now != self.now: + self.now = now + self._collect_field_defaults() + + #check for valid field names + self._check_fields(kwargs=kwargs) + + #Determine which related fields are present + fk_or_one2one = set(kwargs.keys()).intersection(set(self.related_fields.keys())) + many_to_one = set(kwargs.keys()).intersection(set(self.m2one_fields.keys())) + many_to_many = set(kwargs.keys()).intersection(set(self.m2m_fields.keys())) + + #pop off m2m and m2one names, the tempModel can't handle them + m2m_dict = {} + for name in many_to_many: + m2m_dict[name] = kwargs.pop(name) + + m2one_dict = {} + for name in many_to_one: + m2one_dict[name] = kwargs.pop(name) + + #Pop off Foreign Key and OneToOne names if they need to be bulk inserted first + related = [] + for name in fk_or_one2one: + if isinstance(kwargs[name], dict): + manager = self._find_bulk_manager(self.related_classes[name]) + sref = self.related_classes[name] == self.model + arg_hash = manager.bulk_insert(now=self.now, clean_args=True, _self_ref=sref, **kwargs[name]) + related += [(name, arg_hash)] + + kwargs.pop(name) + elif isinstance(kwargs[name], self.related_classes[name]) and getattr(kwargs[name], kwargs[name]._meta.pk.attname) is None: + args = self._generate_args(kwargs[name]) + manager = self._find_bulk_manager(self.related_classes[name]) + sref = self.related_classes[name] == self.model + arg_hash = manager.bulk_insert(now=self.now, clean_args=True, _self_ref=sref, **args) + related += [(name, arg_hash)] + + kwargs.pop(name) + + + #Temporary model for signal dispatch and field preprocessing + #self.tempModel = self.model() + for name in kwargs.keys(): + field = self.tempModel._meta.get_field(name) + if isinstance(field, ForeignKey) and isinstance(kwargs[name], int) or isinstance(kwargs[name], long): + setattr(self.tempModel, field.attname, kwargs[name]) + continue + setattr(self.tempModel, field.name, kwargs[name]) + + #Preprocess field data unless 'raw' specified on call + #Special handling for defaults on date and time fields to ensure + #proper formatting for primary key recovery + watch = {} + for f in self.tempModel._meta.fields: + val = getattr(self.tempModel, f.attname) + watch[f.name] = val + if isinstance(f, AutoField): + if f.name in kwargs.keys(): + kwargs[f.name] = f.get_db_prep_save(raw and val or f.pre_save(self.tempModel, True)) + elif f.name in kwargs.keys(): + kwargs[f.name] = f.get_db_prep_save(raw and val or f.pre_save(self.tempModel, True)) + else: + kwargs[f.name] = self.defaults[f.name] + + #Presave could be called more than once for the same object + if send_pre_save: + signals.pre_save.send(sender=self.tempModel.__class__, instance=self.tempModel) + + #Check for changes from pre_save + for f in [field for field in self.tempModel._meta.fields if field.name in kwargs]: + if watch[f.name] != getattr(self.tempModel, f.attname): + kwargs[f.name] = f.get_db_prep_save(raw and val or f.pre_save(self.tempModel, True)) + + #hash to identify this arg:value set + key = hash_dict(kwargs) + + #Objects with the same arg:value signature are considered + #the same object + if _self_ref: + if key not in self.ref_queue: + self.ref_queue[key] = kwargs + self.ref_order[key] = len(self.ref_queue) + elif key not in self.queue: + self.queue[key] = kwargs + self.order[key] = len(self.queue) + + #With the key computed, associate it with any Fk's and one2one's + #that will be inserted later + for name, arg_hash in related: + if arg_hash not in self.related_queue[name]: + self.related_queue[name][arg_hash] = [] + self.related_queue[name][arg_hash] += [key] + + for name in many_to_one: + self._m2one_enqueue(name, m2one_dict[name], key) + + for name in many_to_many: + self._m2m_enqueue(name, m2m_dict[name], key) + + #tempModel = None + self._clear_tempModel() + if clean_args: + return key + def _clear_tempModel(self): + for field in self.tempModel._meta.fields: + val = field.get_default() + setattr(self.tempModel, field.attname, val) + + def bulk_insert_commit(self, now=None, autoclobber=False, depth=0, max_depth=5, send_post_save=True, _self_ref=False, **kwargs): + """ + Bulk inserts all queued objects and relations to the database with one insert per affected table + and N/100 selects to find the primary keys where N is the number of inserted rows + + If autoclobber is False, the default, the insert is performed with IGNORE. Any object that duplicates one + already in the database is not reinserted but any new relationships will be + + if autoclobber is True, the insert is performed with REPLACE, clobbering any duplicates + + If autoclobber is None, no checking is done and any duplicates will raise a Database Integrity Error + + If kwargs is specified, its values are used to overide any model defaults + """ + if not self.queue or depth > max_depth: + return {} + + self._check_fields(no_related=True, kwargs=kwargs) + + many_to_many = filter(lambda x: x['list'] != [], self.m2m_queue.values()) != [] + many_to_one = filter(lambda x: x != [], self.m2one_queue.values()) != [] + related = filter(lambda x: x != [], self.related_queue.values()) != [] + + m2m_depth = filter(lambda x: x['bulk'], self.m2m_queue.values()) + try: + if not _self_ref: + #Foreign Key and OneToOne are inserted first + #Their primary keys are needed to save the root objects + if related: + self._fk_one2one_insert(depth, max_depth, autoclobber) + + #inserting a fk or one2one invalidates our kwargs signatures + #computing new hashes and mapping to the old hash for m2m and m2one + copy = {} + order_copy = {} + self.update_map = {} + for key, value in self.queue.items(): + new_key = hash_dict(value) + self.update_map[key] = new_key + copy[new_key] = value + try: + order_copy[new_key] = self.order[key] + except KeyError: + #For nested inserts with self references, objects + #inserted across those references have no order information + order_copy[new_key] = sys.maxint + self.queue = copy + self.order = order_copy + + + order = self.order.items() + order.sort(lambda x,y: x[1] - y[1]) + else: + #Related Self References are bunched together and inserted once + #however, commit will be called for every self referencing field + #If the commit has already been done, return the cached queue + if not self.ref_queue: + return self.ref_cache, self.update_map + order = self.ref_order.items() + order.sort(lambda x,y: x[1] - y[1]) + + model_results = None + #Saving the root objects + queue = _self_ref and self.ref_queue or self.queue + model_results = self.insert(table=self.model._meta.db_table, + fields=[f for f in self.model._meta.fields if not isinstance(f, AutoField)], + queue=queue, + order = order, + autoclobber=autoclobber) + + if model_results: + model_results = self.filter(pk__in=[obj[0] for obj in model_results]) + + self._recover_pks(_self_ref) + + if not _self_ref: + if many_to_many: + self._many_to_many_insert(depth, max_depth, autoclobber) + if many_to_one: + self._many_to_one_insert(depth, max_depth, autoclobber) + + except Exception, e: + self.reset() + raise Exception, e + + #Dispatch Post Save Signals + if send_post_save: + values = _self_ref and self.ref_queue.values() or self.queue.values() + for args in values: + for name in args.keys(): + setattr(self.tempModel, self.tempModel._meta.get_field(name).attname, args[name]) + signals.post_save.send(sender=self.tempModel.__class__, instance=self.tempModel, created=True) + self._clear_tempModel() + + if depth > 0: + queue = dict(_self_ref and self.ref_queue or self.queue) + update_map = self.update_map + self.reset(_self_ref) + if _self_ref: + self.ref_cache = queue + return queue, update_map + self.reset() + + if depth == 0 and model_results: + return model_results + return {}, {} + + def reset(self, _self_ref=False): + """ + Close and remove any temp files + Reset all queues and field maps + """ + if _self_ref: + self.ref_queue = {} + self.ref_order = {} + return + + self.queue = {} + self.ref_queue = {} + for key in self.m2one_queue.keys(): + self.m2one_queue[key] = [] + for key in self.m2m_queue.keys(): + self.m2m_queue[key] = {'list':[], 'bulk':False} + for key in self.related_queue.keys(): + self.related_queue[key] = {} + self.update_map = {} + self.order = {} + self.ref_order = {} + self.ref_cache = {} + + ################### + # PRIVATE METHODS # + ################### + def _related_init(self): + """ + Find all related forward and reverse Foreign Key, OneToOne, + ManyToMany and ManyToOne relationships and cache the relevant + fields and model classes + """ + self.initialized = True + for f in self.model._meta.fields: + if isinstance(f, ForeignKey) or isinstance(f, OneToOneField): + self.related_classes[f.name] = f.rel.to + self.related_fields[f.name] = f + self.related_queue[f.name] = {} + + for r in self.model._meta.get_all_related_objects(): + name = r.field.rel.related_name or r.model.__name__.lower() + '_set' + self.m2one_classes[name] = r.model + self.m2one_fields[name] = r.field + self.m2one_queue[name] = [] + + for f in self.model._meta.many_to_many: + self.m2m_classes[f.name] = f.rel.to + self.m2m_fields[f.name] = f + self.m2m_queue[f.name] = {'list':[], 'bulk':False} + + for m2m in self.model._meta.get_all_related_many_to_many_objects(): + name = m2m.field.rel.related_name or m2m.model.__name__.lower() + '_set' + self.m2m_classes[name] = m2m.model + self.m2m_fields[name] = m2m.field + self.m2m_queue[name] = {'list':[], 'bulk':False} + + self._collect_field_defaults() + + def _collect_field_defaults(self): + """ + Collect default values for each field + """ + self.defaults = {} + scrapModel = self.model() + for f in scrapModel._meta.fields: + if isinstance(f, DateTimeField) or isinstance(f, DateField) or isinstance(f, TimeField): + if (f.default == datetime.datetime.now or f.auto_now or f.auto_now_add): + if isinstance(f, DateTimeField): + self.defaults[f.name] = self.now.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(f, DateField): + self.defaults[f.name] = self.now.strftime('%Y-%m-%d') + elif isinstance(f, TimeField): + self.defaults[f.name] = self.now.strftime('%H:%M:%S') + continue + if not isinstance(f, AutoField): + self.defaults[f.name] = scrapModel._meta.get_field(f.name).get_db_prep_save(f.pre_save(scrapModel, True)) + + def _check_fields(self, no_related=False, kwargs={}): + """ + Check that all fields given to bulk_insert and bulk_insert_commit are valid + """ + if no_related: + valid_fields = set([f.name for f in self.model._meta.fields]) + else: + valid_fields = set([f.name for f in self.model._meta.fields] + self.m2m_fields.keys() + self.m2one_fields.keys()) + invalid_fields = set(kwargs.keys()) - valid_fields + + assert len(invalid_fields) == 0, \ + "Invalid field(s): %s . Acceptable field values: %s . All Arguments: %s" %\ + (', '.join(invalid_fields), ', '.join(valid_fields), ', '.join([str(t) for t in kwargs.items()])) + + def _fk_one2one_insert(self, depth, max_depth, autoclobber): + """ + Commit any related fk or one2one objects to the database + Calls bulk_insert_commit on the related class + """ + for name in self.related_queue.keys(): + if self.related_queue[name]: + manager = self._find_bulk_manager(self.related_classes[name]) + self_ref = self.related_classes[name] == self.model + r_queue, update_map = manager.bulk_insert_commit(now=self.now, depth=depth+1, + max_depth=max_depth, autoclobber=autoclobber, _self_ref=self_ref) + if r_queue: + pk_name = self.related_classes[name]._meta.pk.name + + for arg_hash, keys in self.related_queue[name].items(): + arg_hash = update_map.get(arg_hash, arg_hash) + for key in keys: + try: + self.queue[key][name] = r_queue[arg_hash][pk_name] + except KeyError, e: + if key in self.ref_cache.keys(): + print >> sys.stderr , "Warning: Too many recursive self references on a Foreign Key or OneToOne field class:%s field:%s - Value NOT Saved" % (self.model, name) + else: + raise KeyError, e + + def _many_to_one_insert(self, depth, max_depth, autoclobber): + """ + Delayed bulk insert and commit of many to one relations + """ + for name in self.m2one_queue.keys(): + manager = self._find_bulk_manager(self.m2one_classes[name]) + non_related_name = None + for f in self.m2one_classes[name]._meta.fields: + try: + if f.rel.related_name == name: + non_related_name = f.name + break + except: + pass + else: + non_related_name = name + '_set' + + self_ref = self.m2one_classes[name] == self.model + for args_list, key in self.m2one_queue[name]: + key = self.update_map.get(key, key) + pk = self.queue[key][self.model._meta.pk.name] + for args in args_list: + args[non_related_name] = pk + manager.bulk_insert(now=self.now, _self_ref=self_ref, **args) + manager.bulk_insert_commit(now=self.now, depth=depth+1, max_depth=max_depth, autoclobber=autoclobber, _self_ref=self_ref) + + def _many_to_many_insert(self, depth, max_depth, autoclobber): + """ + Inserts all ManyToMany related objects and their relationships + """ + for name in self.m2m_queue.keys(): + if self.m2m_queue[name]['bulk']: + self_ref = self.m2m_classes[name] == self.model + manager = self._find_bulk_manager(self.m2m_classes[name]) + r_queue, update_map = manager.bulk_insert_commit(now=self.now, depth=depth+1, max_depth=max_depth, autoclobber=autoclobber, _self_ref=self_ref) + if r_queue: + for entry in self.m2m_queue[name]['list']: + for arg_hash in entry['bulk']: + entry['values'] += [r_queue[arg_hash][self.m2m_classes[name]._meta.pk.name]] + else: + warning = "Max recursion depth, %s, exceeded. Some relationships between %s and %s may not be defined." + sys.stderr.write(warning % (max_depth, self.model.__name__, self.m2m_classes[name].__name__)) + + if self.m2m_queue[name]['list']: + table = self.m2m_fields[name].m2m_db_table() + + columns = [self.m2m_fields[name].m2m_column_name(), self.m2m_fields[name].m2m_reverse_name()] + + #Determine the direction of the ManyToMany Relationship + #The special case of a self referential field requires further checking + if self.m2m_fields[name].rel.to == self.model: + if self.m2m_classes[name] != self.model or not filter(lambda x: x.name==name, self.model._meta.many_to_many): + columns.reverse() + + #This value only matters for self referential relations + symmetrical = False + if self.m2m_classes[name] == self.model: + if getattr(self.m2m_fields[name].rel, 'symmetrical', False): + symmetrical = True + for key in self.ref_cache.keys(): + self.queue[key] = self.ref_cache[key] + + self.insert_m2m(table, self.model._meta.pk.name, + columns, self.queue, self.m2m_queue[name], + self.update_map, autoclobber, symmetrical) + + def _recover_pks(self, _self_ref=False): + """ + Store the recovered primary keys in the local queue + Recover them 100 at a time + """ + fields = [f for f in self.model._meta.fields if not isinstance(f, AutoField)] + + qn = connection.ops.quote_name + cursor = connection.cursor() + + if self.model._meta.pk in fields: + return #No keys to recover + + recovery_fields = fields + [self.model._meta.pk] + + table = self.model._meta.db_table + primary = self.model._meta.pk + + pk_index = len(recovery_fields) - 1 + + queue = _self_ref and self.ref_queue or self.queue + + recover_limit = 100 + start = 0 + for end in xrange(recover_limit, len(queue)+recover_limit, recover_limit): + where = [] + query_data = [] + for kwargs in queue.values()[start:end]: + temp = [] + for f in fields: + query_data += [kwargs[f.name]] + if kwargs[f.name] is None: + temp += ['%s is %%s' % (qn(f.column))] + else: + temp += ['%s = %%s' % (qn(f.column))] + where += ['(' + ' AND '.join(temp) + ')'] + + where = ' OR '.join(where) + + sql = "SELECT %s FROM %s WHERE " % \ + (','.join(["%s.%s" % (qn(table), qn(f.column)) for f in recovery_fields]), qn(table)) + \ + where + " ORDER BY %s" % qn(primary.column) + + cursor.execute(sql, query_data) + rows = cursor.fetchall() + + result = [] + for row in rows: + temp = {} + for f, r in zip(recovery_fields[:-1], row): + if isinstance(r, datetime.datetime): + r = r.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(r, datetime.date): + r = r.strftime('%Y-%m-%d') + elif isinstance(r, datetime.time): + r = r.strftime('%H:%M:%S') + temp[f.name] = r + + try: + queue[hash_dict(temp)][primary.name] = row[pk_index] + except KeyError: + pass + + for q in queue.values()[start:end]: + if primary.name not in q: + raise Exception, "Integrity Error. Object %s could not be inserted" % ', '.join([unicode(k).encode('utf8') + ' : ' + unicode(v).encode('utf8') for k,v in q.items()]) + + start = end + + def _find_bulk_manager(self, cls): + """ + Locate a bulk manager on a related class + If there isn't one, add one + """ + for attr in dir(cls): + try: + m = getattr(cls, attr) + if isinstance(m, self.__class__): + return getattr(cls, attr) + except: + pass + + cls.add_to_class('_bulk_manager', self.__class__()) + return cls._bulk_manager + + def _m2one_enqueue(self, name, value, key): + """ + Queue for the many side of ManyToOne relationships + Can't do anything with them until the primary key for + the root side is available + """ + if not isinstance(value, list): + value = [value] + self.m2one_queue[name] += [(value, key)] + + def _generate_args(self, obj): + """ + If we have been supplied a model object with no primary key, + convert it into a kwargs dictionary + """ + args = {} + for f in obj._meta.fields: + if isinstance(f, AutoField): + continue + args[f.name] = f.get_db_prep_save(f.pre_save(obj, True)) + return args + + def _m2m_enqueue(self, name, value, key): + """ + ManyToMany Queue + This handles dicts of args, integer ids, and model objects with or without primary keys + Or a list with any combination of the above + """ + cls = self.m2m_classes[name] + bulk = [] + self_ref = cls == self.model + pk = self.model._meta.pk.name + if not isinstance(value, list): + value = [value] + new_value = [] + for v in value: + if isinstance(v, dict): + if v.get(pk, None): + new_value += [v[pk]] + manager = self._find_bulk_manager(cls) + bulk += [manager.bulk_insert(now=self.now, clean_args=True, _self_ref=self_ref, **v)] + elif isinstance(v, cls): + if getattr(v, pk, None) is None: + args = self._generate_args(v) + manager = self._find_bulk_manager(cls) + bulk = [manager.bulk_insert(now=self.now, clean_args=True, _self_ref=self_ref, **args)] + else: + new_value += [getattr(v, pk)] + elif isinstance(value, (int,long)): + new_value += [long(v)] + else: + raise ValueError, "ManyToMany list argument, %s=%s, must contain numbers, dicts or instances of %s" %\ + (name, value, cls.__name__) + + if bulk: + self.m2m_queue[name]['bulk'] = True + self.m2m_queue[name]['list'] += [{'values':new_value, 'key':key, 'bulk':bulk}] + + def insert_m2m(self, table, primary_key_name, columns, queue, m2m_queue, update_map, autoclobber, symmetrical): + qn = connection.ops.quote_name + cursor = connection.cursor() + + if autoclobber is None or autoclobber == True: + autoclobber = '' + else: + autoclobber = 'IGNORE' + + sql = u'INSERT %s INTO %s (%s) ' % \ + (autoclobber, qn(table), ', '.join([qn(c) for c in columns])) + + value_list = [] + for obj in m2m_queue['list']: + for value in obj['values']: + key = update_map.get(obj['key'], obj['key']) + + value_to_add = [queue[key][primary_key_name], value] + if value_to_add not in value_list: + value_list.append(value_to_add) + + if symmetrical: + value_to_add = [value, queue[key][primary_key_name]] + if value_to_add not in value_list: + value_list.append(value_to_add) + + arg_string = ', '.join([u'(%s,%s)'] * len(value_list)) + values = 'VALUES %s' % arg_string + + flat_value_list = [] + for v in value_list: + flat_value_list.append(v[0]) + flat_value_list.append(v[1]) + + sql = sql + values + cursor.execute(sql, flat_value_list) + + def insert(self, table, fields, queue, order, autoclobber=False): + """ + Bulk insert using INSERT + ***Limited by max packet size on mysql server*** + """ + qn = connection.ops.quote_name + cursor = connection.cursor() + + if autoclobber is None or autoclobber == True: + autoclobber = '' + else: + autoclobber = 'IGNORE' + + sql = u'INSERT %s INTO %s (%s) ' % \ + (autoclobber, qn(table), ', '.join([qn(f.column) for f in fields])) + + value_list = [] + for key, order in order: + kwargs = queue[key] + value_list += [kwargs[f.name] for f in fields] + + arg_string = ', '.join([u'(' + ','.join(['%s']*len(fields)) + ')'] * len(queue.values())) + values = 'VALUES %s' % arg_string + + sql = sql + values + (" RETURNING %s.%s" % (qn(table), qn('id'))) + cursor.execute(sql, value_list) + return cursor.fetchall() + diff --git a/rapidsms_httprouter/models.py b/rapidsms_httprouter/models.py index 7f5af0c..4016318 100644 --- a/rapidsms_httprouter/models.py +++ b/rapidsms_httprouter/models.py @@ -1,10 +1,10 @@ import datetime - -from django.db import models, connections -from django.db.models.query import QuerySet +from django.db import models from rapidsms.models import Contact, Connection +from .managers import ForUpdateManager, BulkInsertManager + DIRECTION_CHOICES = ( ("I", "Incoming"), ("O", "Outgoing")) @@ -31,18 +31,6 @@ # See: https://coderanger.net/2011/01/select-for-update/ # -class ForUpdateQuerySet(QuerySet): - def for_single_update(self): - if 'sqlite' in connections[self.db].settings_dict['ENGINE'].lower(): - # Noop on SQLite since it doesn't support FOR UPDATE - return self - sql, params = self.query.get_compiler(self.db).as_sql() - return self.model._default_manager.raw(sql.rstrip() + ' LIMIT 1 FOR UPDATE', params) - -class ForUpdateManager(models.Manager): - def get_query_set(self): - return ForUpdateQuerySet(self.model, using=self._db) - class Message(models.Model): connection = models.ForeignKey(Connection, related_name='messages') text = models.TextField() @@ -50,10 +38,12 @@ class Message(models.Model): status = models.CharField(max_length=1, choices=STATUS_CHOICES) date = models.DateTimeField(auto_now_add=True) - in_response_to = models.ForeignKey('self', related_name='responses', null=True, blank=True) + in_response_to = models.ForeignKey('self', related_name='responses', null=True) + application = models.CharField(max_length=100, null=True) # set our manager to our update manager objects = ForUpdateManager() + bulk = BulkInsertManager() def __unicode__(self): # crop the text (to avoid exploding the admin) @@ -69,4 +59,13 @@ def as_json(self): direction=self.direction, status=self.status, text=self.text, date=self.date.isoformat()) - + @classmethod + def mass_text(cls, text, connections, status='P'): + for connection in connections: + Message.bulk.bulk_insert( + send_pre_save=False, + text=text, + direction='O', + status=status, + connection=connection) + return Message.bulk.bulk_insert_commit(send_post_save=False, autoclobber=True) diff --git a/rapidsms_httprouter/router.py b/rapidsms_httprouter/router.py index 2acb3dd..6bd621d 100644 --- a/rapidsms_httprouter/router.py +++ b/rapidsms_httprouter/router.py @@ -74,6 +74,14 @@ def run(self): # if it wasn't cancelled, send it off if send_msg: self.send_message(outgoing_message) + self._isbusy = False + time.sleep(getattr(settings, 'WORKER_SLEEP_SHORT', 0.25)) + else: + # if there weren't any messages queued, back off + # from polling the DB + self._isbusy = False + time.sleep(getattr(settings, 'WORKER_SLEEP_LONG',1)) + except: import traceback traceback.print_exc() @@ -282,6 +290,10 @@ def handle_incoming(self, backend, sender, text): # mark the message handled to avoid the # default phase firing unnecessarily msg.handled = True + outgoing_db_lock.acquire() + db_message.application = app + db_message.save() + outgoing_db_lock.release() break elif phase == "default": @@ -304,7 +316,7 @@ def handle_incoming(self, backend, sender, text): # now send the message responses while msg.responses: response = msg.responses.pop(0) - self.handle_outgoing(response, db_message) + self.handle_outgoing(response, db_message, db_message.application) # we are no longer interested in this message... but some crazy # synchronous backends might be, so mark it as processed. @@ -312,8 +324,7 @@ def handle_incoming(self, backend, sender, text): return db_message - - def add_outgoing(self, connection, text, source=None, status='Q'): + def add_outgoing(self, connection, text, source=None, status='Q', application=None): """ Adds a message to our outgoing queue, this is a non-blocking action """ @@ -323,13 +334,12 @@ def add_outgoing(self, connection, text, source=None, status='Q'): text=text, direction='O', status=status, - in_response_to=source) + in_response_to=source, + application=application) outgoing_db_lock.release() self.info("SMS[%d] OUT (%s) : %s" % (db_message.id, str(connection), text)) - global outgoing_worker_threads - # if we have no ROUTER_URL configured, then immediately process our outgoing phases # and leave the message in the queue if not getattr(settings, 'ROUTER_URL', None): @@ -342,28 +352,33 @@ def add_outgoing(self, connection, text, source=None, status='Q'): # otherwise, fire up any threads we need to send the message out else: # check for available worker threads in the pool, add one if necessary - num_workers = getattr(settings, 'ROUTER_WORKERS', 5) - all_busy = True - for worker in outgoing_worker_threads: - if not worker.is_busy(): - all_busy = False - break - - if all_busy and len(outgoing_worker_threads) < num_workers: - worker = HttpRouterThread() - worker.daemon = True # they don't need to quit gracefully - worker.start() - outgoing_worker_threads.append(worker) + self.check_workers() return db_message - - def handle_outgoing(self, msg, source=None): + + def check_workers(self): + global outgoing_worker_threads + # check for available worker threads in the pool, add one if necessary + num_workers = getattr(settings, 'ROUTER_WORKERS', 5) + all_busy = True + for worker in outgoing_worker_threads: + if not worker.is_busy(): + all_busy = False + break + + if all_busy and len(outgoing_worker_threads) < num_workers: + worker = HttpRouterThread() + worker.daemon = True # they don't need to quit gracefully + worker.start() + outgoing_worker_threads.append(worker) + + def handle_outgoing(self, msg, source=None, application=None): """ Sends the passed in RapidSMS message off. Optionally ties the outgoing message to the incoming message which triggered it. """ # add it to our outgoing queue - db_message = self.add_outgoing(msg.connection, msg.text, source, status='P') + db_message = self.add_outgoing(msg.connection, msg.text, source, status='P', application=application) return db_message @@ -436,7 +451,7 @@ def add_app(self, module_name): return app - def start(self): + def start(self, start_workers=False): """ Initializes our router. TODO: this happens in the HTTP thread on the first call, that could be bad. @@ -453,6 +468,10 @@ def start(self): # upon first starting up self.outgoing = [message for message in Message.objects.filter(status='Q')] + # kick start one worker + if start_workers: + self.check_workers() + # mark ourselves as started self.started = True @@ -460,7 +479,7 @@ def start(self): http_router = HttpRouter() http_router_lock = Lock() -def get_router(): +def get_router(start_workers=False): """ Takes care of performing lazy initialization of the www router. """ @@ -471,7 +490,7 @@ def get_router(): http_router_lock.acquire() try: if not http_router.started: - http_router.start() + http_router.start(start_workers) finally: http_router_lock.release() diff --git a/rapidsms_httprouter/templates/router/summary.html b/rapidsms_httprouter/templates/router/summary.html new file mode 100644 index 0000000..d0a2804 --- /dev/null +++ b/rapidsms_httprouter/templates/router/summary.html @@ -0,0 +1,66 @@ +{% extends "layout.html" %} + +{% block title %}Message Report - {{ block.super }}{% endblock %} + +{% block content %} +
Month | +Year | + {% regroup messages|dictsort:"connection__backend__name" by connection__backend__name as link_list %} + {% for backend in link_list %} +{{ backend.grouper }} | + {% endfor %} +||||||
Incoming | +Outgoing | + {% endfor %} +|||||||
{{ month.grouper|floatformat }} | +{{ year.grouper|floatformat }} | + {% regroup month.list by connection__backend__name as mlink_list %} + {% for backend in link_list %} + {% for local_backend in mlink_list %} + {% ifequal backend.grouper local_backend.grouper %} + {% comment %}Filthy hack for setting a variable{% endcomment %} + {% regroup local_backend.list by direction as foundit %} + {% if local_backend.list.0.direction == 'I' %} +{{ local_backend.list.0.total }} | + {% if local_backend.list.1.direction == 'O' %} +{{ local_backend.list.1.total }} | + {% else %} +0 | + {% endif %} + {% else %} +0 | +{{ local_backend.list.0.total }} | + {% endif %} + {% endifequal %} + {% if forloop.last %} + {% if not foundit %} +0 | +0 | + {% endif %} + {% endif %} + {% endfor %} + + {% endfor %} +