diff --git a/redbiom/_requests.py b/redbiom/_requests.py index 24b783c..4bedee2 100644 --- a/redbiom/_requests.py +++ b/redbiom/_requests.py @@ -53,9 +53,14 @@ def f(context, cmd, payload): sys.stdout.write(proto) sys.stdout.flush() else: - def f(context, cmd, payload): + def f(context, cmd, payload, verbose=False): req = s.post(config['hostname'], data=_format_request(context, cmd, payload)) + + if verbose: + print(context, cmd, payload[:100]) + print(req.status_code) + print(req.content) return _parse_validate_request(req, cmd) return f @@ -103,6 +108,7 @@ def f(sha, *args): payload.extend([str(a) for a in args]) url = '/'.join(payload) return json.loads(_parse_validate_request(s.get(url), 'EVALSHA')) + return f diff --git a/redbiom/admin.py b/redbiom/admin.py index f294731..59db9c6 100644 --- a/redbiom/admin.py +++ b/redbiom/admin.py @@ -1,4 +1,6 @@ from urllib.parse import quote_plus as _quote_plus +from math import ceil +import numpy as np def quote_plus(s): @@ -9,13 +11,58 @@ class ScriptManager: """Static singleton for managing Lua scripts in the Redis backend""" # derived from http://stackoverflow.com/a/43900922/19741 _scripts = {'get-index': """ - local kid = redis.call('HGET', KEYS[1], ARGV[1]) - if not kid then - kid = redis.call('HINCRBY', KEYS[1], 'current_id', 1) - 1 - redis.call('HSET', KEYS[1], ARGV[1], kid) - redis.call('HSET', KEYS[1] .. '-inverted', kid, ARGV[1]) + local indices = {} + local kid = nil + + -- for each index and identifier (like python's enumerate) + for position, name in ipairs(ARGV) do + kid = redis.call('HGET', KEYS[1], name) + + -- if an identifier was not observed, add it + if not kid then + kid = redis.call('HINCRBY', + KEYS[1], + 'current_id', 1) - 1 + redis.call('HSET', KEYS[1], name, kid) + redis.call('HSET', KEYS[1] .. '-inverted', kid, name) + end + + -- store store the mapping for return + indices[position] = tonumber(kid) + end + return cjson.encode(indices)""", + 'load-data': """ + -- Redis has a compile time stack limit for Lua calls + -- so rather than recompiling with an arbitrary limit, + -- we're going to instead chunk calls where there are a + -- large number of arguments. The default is 8000 for the + -- stack size, so we'll use 7900 to be close without + -- going over + -- https://stackoverflow.com/a/39959618/19741 + local call_in_chunks = function (command, key, args) + local step = 7900 + for i = 1, #args, step do + redis.call(command, + key, + unpack(args, + i, + math.min(i + step - 1, #args))) + end + end + + -- Lua does not have a natural split, for various reasons + -- outlined in the URL below, so we need to do this + -- manually. We'll split on "|" which should be safe + -- as the values sent are only ever expected to be integers + -- http://lua-users.org/wiki/SplitJoin + for idx, arg in ipairs(ARGV) do + local items = {} + for item in string.gmatch(arg, "([^|]+)") do + table.insert(items, item) + end + call_in_chunks('LPUSH', KEYS[idx], items) end - return kid""", + return redis.status_reply("OK")""", 'fetch-feature': """ local context = ARGV[1] local key = ARGV[2] @@ -62,7 +109,7 @@ class ScriptManager: end return cjson.encode(result)"""} - _admin_scripts = ('get-index', ) + _admin_scripts = ('get-index', 'load-data') _cache = {} @staticmethod @@ -209,7 +256,96 @@ def create_context(name, description): ScriptManager.load_scripts() -def load_sample_data(table, context, tag=None, redis_protocol=False): +def _load_axis_data(table, ids, opposite_ids, opposite_id_index, axis_label, + context, batchsize): + """Manage the loading of data for a particular axis + + Parameters + ---------- + table : biom.Table + The table to obtain data from + ids : iterable of str + The IDs to obtain data for + opposite_ids : iterable of str + The IDs of the opposite axis in the table + opposite_id_index : dict + The index which maps an opposite ID to the index value within + the Redis database for the identifier + axis_label : str + The biom.Table axis label of ids + context : str + The context to load the data into + batchsize : int + The number of identifiers to group into a single request + + Notes + ----- + This method only supports count data. + + Data are loaded through the "load-data" Lua script managed in the + ScriptsManager. This method in effect packs the data into a structure + compatible with Webdis, and the EVALSHA command structure of Redis. The + "load-data" script then iterates over the "KEYS" and "ARGV"s, parsing + the respective entries into values that can be directly loaded. + + Redis command summary + --------------------- + EVALSHA N :: ... ... + + Note that "N" refers to the number of "KEYS". The "load-data" Lua script + assumes that there are "N" "KEYS" as well as "N" "ARGV"s. For the call, + "KEYS" are the prefixed identifiers (e.g., "::") + and "ARGV" are the "packeddata". "KEYS" and "ARGV" are expected to be in + index order with each other. + """ + import redbiom + import redbiom._requests + if axis_label == 'feature': + axis = 'observation' + elif axis_label == 'sample': + axis = 'sample' + else: + raise ValueError("%s is unrecognized as an axis" % axis) + + config = redbiom.get_config() + post = redbiom._requests.make_post(config) + loader_sha = ScriptManager.get('load-data') + + # partition our IDs into smaller batches + splits = max(1, ceil(len(ids) / batchsize)) + for batch in np.array_split(ids, splits): + keys = [] + argv = [] + + # pack the id specific data into a format the Lua logic expects + for id_ in batch: + values = table.data(id_, axis=axis, dense=False) + if not np.allclose(values.data - np.round(values.data, 1), 0.0): + raise ValueError("Data do not appear to be counts") + + int_values = values.astype(int) + remapped = [opposite_id_index[i] + for i in opposite_ids[values.indices]] + + packed = '|'.join(["%d|%d" % (v, i) + for i, v in zip(remapped, int_values.data)]) + + keys.append(f"{context}:{axis_label}:{id_}") + argv.append(packed) + + nkeys = str(len(keys)) + + # load the count data + payload = [loader_sha, nkeys] + keys + argv + post(None, 'EVALSHA', '/'.join(payload)) + + # note which identifiers are represented + payload = f"{axis_label}s-represented/%s" % '/'.join(batch) + post(context, 'SADD', payload, verbose=False) + + +def load_sample_data(table, context, tag=None, redis_protocol=False, + batchsize=1000): """Load nonzero sample data. Parameters @@ -222,6 +358,8 @@ def load_sample_data(table, context, tag=None, redis_protocol=False): A tag to associated the samples with (e.g., a preparation ID). redis_protocol : bool, optional Generate commands for bulk load instead of HTTP requests. + batchsize : int, optional + The number of samples or features to load at once Raises ------ @@ -263,48 +401,23 @@ def load_sample_data(table, context, tag=None, redis_protocol=False): import redbiom.util config = redbiom.get_config() - post = redbiom._requests.make_post(config, redis_protocol=redis_protocol) get = redbiom._requests.make_get(config) + post = redbiom._requests.make_post(config) redbiom._requests.valid(context, get) table = _stage_for_load(table, context, get, tag) samples = table.ids()[:] - obs = table.ids(axis='observation') - - obs_index = {} - for id_ in obs: - obs_index[id_] = get_index(context, id_, 'feature') - - samp_index = {} - for id_ in samples: - samp_index[id_] = get_index(context, id_, 'sample') - - # load up per-sample - for values, id_, _ in table.iter(dense=False): - int_values = values.astype(int) - remapped = [obs_index[i] for i in obs[values.indices]] - - packed = '/'.join(["%d/%s" % (v, i) - for i, v in zip(remapped, - int_values.data)]) - post(context, 'LPUSH', 'sample:%s/%s' % (id_, packed)) - - payload = "samples-represented/%s" % '/'.join(samples) - post(context, 'SADD', payload) + obs = table.ids(axis='observation')[:] - # load up per-observation - for values, id_, md in table.iter(axis='observation', dense=False): - int_values = values.astype(int) - remapped = [samp_index[i] for i in samples[values.indices]] + obs_index = {i: j for i, j in zip(obs, get_index(context, obs, 'feature'))} + samp_index = {i: j for i, j in + zip(samples, get_index(context, samples, 'sample'))} - packed = '/'.join(["%d/%s" % (v, i) - for i, v in zip(remapped, - int_values.data)]) - post(context, 'LPUSH', 'feature:%s/%s' % (id_, packed)) - - payload = "features-represented/%s" % '/'.join(obs) - post(context, 'SADD', payload) + _load_axis_data(table, samples, obs, obs_index, 'sample', context, + batchsize=10) + _load_axis_data(table, obs, samples, samp_index, 'feature', context, + batchsize=500) # load up taxonomy taxonomy = _metadata_to_taxonomy_tree(table.ids(axis='observation'), @@ -615,17 +728,19 @@ def _stage_for_load(table, context, get, tag=None): return table.filter(lambda v, i, md: v.sum() > 0, axis='observation') -def get_index(context, key, axis): +def get_index(context, keys, axis, batchsize=100): """Get a unique integer value for a key within a context Parameters ---------- context : str The context to operate in - key : str - The key to get a unique index for + keys : list or tuple of str + The keys to get a unique index for axis : str Either feature or sample + batchsize : int, optional + The number of IDs to query at once Notes ----- @@ -638,24 +753,28 @@ def get_or_set(d, item): Returns ------- - int - A unique integer index within the context for the key + tuple of int + The unique integer indices within the context for the keys. This is + returned in index order with keys. """ import redbiom + import json import redbiom._requests config = redbiom.get_config() - # we need to issue the request directly as the command structure is - # rather different than other commands - s = redbiom._requests.get_session() - sha = ScriptManager.get('get-index') - url = '/'.join([config['hostname'], 'EVALSHA', sha, - '1', "%s:%s-index" % (context, axis), key]) - req = s.get(url) - - if req.status_code != 200: - raise ValueError("Unable to obtain index; %d; %s" % (req.status_code, - req.content)) - - return int(req.json()['EVALSHA']) + post = redbiom._requests.make_post(config) + indexer_sha = ScriptManager.get('get-index') + context_axis = "%s:%s-index" % (context, axis) + + indices = [] + splits = max(1, ceil(len(keys) / batchsize)) + for batch in np.array_split(keys, splits): + # the redis EVALSHA command structure requires specifying how many keys + # there are, which is always 1 in this case. + nkeys = '1' + payload = [indexer_sha, nkeys, context_axis] + list(batch) + data = json.loads(post(None, 'EVALSHA', '/'.join(payload))) + indices.extend(data) + + return indices diff --git a/redbiom/tests/test_admin.py b/redbiom/tests/test_admin.py index 0ebc6c2..5f3d627 100644 --- a/redbiom/tests/test_admin.py +++ b/redbiom/tests/test_admin.py @@ -118,15 +118,28 @@ def test_metadata_to_taxonomy_tree(self): obs = redbiom.admin._metadata_to_taxonomy_tree(*input) self.assertEqual(obs.compare_subsets(exp), 0.0) - def test_get_index(self): + def test_get_index_singular(self): context = 'load-features-test' redbiom.admin.create_context(context, 'foo') tests = [('A', 0), ('A', 0), ('B', 1), ('C', 2), ('B', 1), ('Z', 3), ('A', 0)] for key, exp in tests: - obs = redbiom.admin.get_index(context, key, 'feature') - self.assertEqual(obs, exp) + obs = redbiom.admin.get_index(context, [key, ], 'feature') + self.assertEqual(len(obs), 1) + self.assertEqual(obs[0], exp) + + def test_get_index_batch(self): + context = 'load-features-test' + redbiom.admin.create_context(context, 'foo') + + tests = [('A', 0), ('A', 0), ('B', 1), ('C', 2), + ('B', 1), ('Z', 3), ('A', 0)] + + keys = [a for a, _ in tests] + exp = [b for _, b in tests] + obs = redbiom.admin.get_index(context, keys, 'feature') + self.assertEqual(obs, exp) def test_create_context(self): obs = self.get('state', 'HGETALL', 'contexts')