Skip to content

Commit

Permalink
Support for large numbers of features (#103)
Browse files Browse the repository at this point in the history
* TST: bulk get index test

* Batch index requests

* A little cleanup on debug items

* Additional comments

* sty

* Update redbiom/admin.py

Co-authored-by: Antonio Gonzalez <[email protected]>

* BUG: fixes #108, thanks @cotillau!

* VER: actually bump it, partially resolves #107, thanks @BenKaehler

* tentative py3738 support

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* update actions

* BUG: fixes #93

* BUG: fixes #92

* remove unhelpful print

* Bump version

* Force json (#113)

* TST: sample id content type bug

* MAINT: fix issue where samples with .raw as a suffix were triggering unexpected returns

* Don't suffix twice

* Adjsut to account for force of json

* Address @antgonza's comments

* use the right variable name

* Verify data appear count

* Verify data appear count

Co-authored-by: Antonio Gonzalez <[email protected]>
  • Loading branch information
wasade and antgonza authored Oct 21, 2021
1 parent 7694b50 commit d4af5a2
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 64 deletions.
8 changes: 7 additions & 1 deletion redbiom/_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ def f(context, cmd, payload):
sys.stdout.write(proto)
sys.stdout.flush()
else:
def f(context, cmd, payload):
def f(context, cmd, payload, verbose=False):
req = s.post(config['hostname'],
data=_format_request(context, cmd, payload))

if verbose:
print(context, cmd, payload[:100])
print(req.status_code)
print(req.content)
return _parse_validate_request(req, cmd)
return f

Expand Down Expand Up @@ -103,6 +108,7 @@ def f(sha, *args):
payload.extend([str(a) for a in args])
url = '/'.join(payload)
return json.loads(_parse_validate_request(s.get(url), 'EVALSHA'))

return f


Expand Down
239 changes: 179 additions & 60 deletions redbiom/admin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from urllib.parse import quote_plus as _quote_plus
from math import ceil
import numpy as np


def quote_plus(s):
Expand All @@ -9,13 +11,58 @@ class ScriptManager:
"""Static singleton for managing Lua scripts in the Redis backend"""
# derived from http://stackoverflow.com/a/43900922/19741
_scripts = {'get-index': """
local kid = redis.call('HGET', KEYS[1], ARGV[1])
if not kid then
kid = redis.call('HINCRBY', KEYS[1], 'current_id', 1) - 1
redis.call('HSET', KEYS[1], ARGV[1], kid)
redis.call('HSET', KEYS[1] .. '-inverted', kid, ARGV[1])
local indices = {}
local kid = nil
-- for each index and identifier (like python's enumerate)
for position, name in ipairs(ARGV) do
kid = redis.call('HGET', KEYS[1], name)
-- if an identifier was not observed, add it
if not kid then
kid = redis.call('HINCRBY',
KEYS[1],
'current_id', 1) - 1
redis.call('HSET', KEYS[1], name, kid)
redis.call('HSET', KEYS[1] .. '-inverted', kid, name)
end
-- store store the mapping for return
indices[position] = tonumber(kid)
end
return cjson.encode(indices)""",
'load-data': """
-- Redis has a compile time stack limit for Lua calls
-- so rather than recompiling with an arbitrary limit,
-- we're going to instead chunk calls where there are a
-- large number of arguments. The default is 8000 for the
-- stack size, so we'll use 7900 to be close without
-- going over
-- https://stackoverflow.com/a/39959618/19741
local call_in_chunks = function (command, key, args)
local step = 7900
for i = 1, #args, step do
redis.call(command,
key,
unpack(args,
i,
math.min(i + step - 1, #args)))
end
end
-- Lua does not have a natural split, for various reasons
-- outlined in the URL below, so we need to do this
-- manually. We'll split on "|" which should be safe
-- as the values sent are only ever expected to be integers
-- http://lua-users.org/wiki/SplitJoin
for idx, arg in ipairs(ARGV) do
local items = {}
for item in string.gmatch(arg, "([^|]+)") do
table.insert(items, item)
end
call_in_chunks('LPUSH', KEYS[idx], items)
end
return kid""",
return redis.status_reply("OK")""",
'fetch-feature': """
local context = ARGV[1]
local key = ARGV[2]
Expand Down Expand Up @@ -62,7 +109,7 @@ class ScriptManager:
end
return cjson.encode(result)"""}
_admin_scripts = ('get-index', )
_admin_scripts = ('get-index', 'load-data')
_cache = {}

@staticmethod
Expand Down Expand Up @@ -209,7 +256,96 @@ def create_context(name, description):
ScriptManager.load_scripts()


def load_sample_data(table, context, tag=None, redis_protocol=False):
def _load_axis_data(table, ids, opposite_ids, opposite_id_index, axis_label,
context, batchsize):
"""Manage the loading of data for a particular axis
Parameters
----------
table : biom.Table
The table to obtain data from
ids : iterable of str
The IDs to obtain data for
opposite_ids : iterable of str
The IDs of the opposite axis in the table
opposite_id_index : dict
The index which maps an opposite ID to the index value within
the Redis database for the identifier
axis_label : str
The biom.Table axis label of ids
context : str
The context to load the data into
batchsize : int
The number of identifiers to group into a single request
Notes
-----
This method only supports count data.
Data are loaded through the "load-data" Lua script managed in the
ScriptsManager. This method in effect packs the data into a structure
compatible with Webdis, and the EVALSHA command structure of Redis. The
"load-data" script then iterates over the "KEYS" and "ARGV"s, parsing
the respective entries into values that can be directly loaded.
Redis command summary
---------------------
EVALSHA <load-data-sha1> N <context>:<axis_label>:<id> ... <packeddata> ...
Note that "N" refers to the number of "KEYS". The "load-data" Lua script
assumes that there are "N" "KEYS" as well as "N" "ARGV"s. For the call,
"KEYS" are the prefixed identifiers (e.g., "<context>:<axis_label>:<id>")
and "ARGV" are the "packeddata". "KEYS" and "ARGV" are expected to be in
index order with each other.
"""
import redbiom
import redbiom._requests
if axis_label == 'feature':
axis = 'observation'
elif axis_label == 'sample':
axis = 'sample'
else:
raise ValueError("%s is unrecognized as an axis" % axis)

config = redbiom.get_config()
post = redbiom._requests.make_post(config)
loader_sha = ScriptManager.get('load-data')

# partition our IDs into smaller batches
splits = max(1, ceil(len(ids) / batchsize))
for batch in np.array_split(ids, splits):
keys = []
argv = []

# pack the id specific data into a format the Lua logic expects
for id_ in batch:
values = table.data(id_, axis=axis, dense=False)
if not np.allclose(values.data - np.round(values.data, 1), 0.0):
raise ValueError("Data do not appear to be counts")

int_values = values.astype(int)
remapped = [opposite_id_index[i]
for i in opposite_ids[values.indices]]

packed = '|'.join(["%d|%d" % (v, i)
for i, v in zip(remapped, int_values.data)])

keys.append(f"{context}:{axis_label}:{id_}")
argv.append(packed)

nkeys = str(len(keys))

# load the count data
payload = [loader_sha, nkeys] + keys + argv
post(None, 'EVALSHA', '/'.join(payload))

# note which identifiers are represented
payload = f"{axis_label}s-represented/%s" % '/'.join(batch)
post(context, 'SADD', payload, verbose=False)


def load_sample_data(table, context, tag=None, redis_protocol=False,
batchsize=1000):
"""Load nonzero sample data.
Parameters
Expand All @@ -222,6 +358,8 @@ def load_sample_data(table, context, tag=None, redis_protocol=False):
A tag to associated the samples with (e.g., a preparation ID).
redis_protocol : bool, optional
Generate commands for bulk load instead of HTTP requests.
batchsize : int, optional
The number of samples or features to load at once
Raises
------
Expand Down Expand Up @@ -263,48 +401,23 @@ def load_sample_data(table, context, tag=None, redis_protocol=False):
import redbiom.util

config = redbiom.get_config()
post = redbiom._requests.make_post(config, redis_protocol=redis_protocol)
get = redbiom._requests.make_get(config)
post = redbiom._requests.make_post(config)

redbiom._requests.valid(context, get)

table = _stage_for_load(table, context, get, tag)
samples = table.ids()[:]
obs = table.ids(axis='observation')

obs_index = {}
for id_ in obs:
obs_index[id_] = get_index(context, id_, 'feature')

samp_index = {}
for id_ in samples:
samp_index[id_] = get_index(context, id_, 'sample')

# load up per-sample
for values, id_, _ in table.iter(dense=False):
int_values = values.astype(int)
remapped = [obs_index[i] for i in obs[values.indices]]

packed = '/'.join(["%d/%s" % (v, i)
for i, v in zip(remapped,
int_values.data)])
post(context, 'LPUSH', 'sample:%s/%s' % (id_, packed))

payload = "samples-represented/%s" % '/'.join(samples)
post(context, 'SADD', payload)
obs = table.ids(axis='observation')[:]

# load up per-observation
for values, id_, md in table.iter(axis='observation', dense=False):
int_values = values.astype(int)
remapped = [samp_index[i] for i in samples[values.indices]]
obs_index = {i: j for i, j in zip(obs, get_index(context, obs, 'feature'))}
samp_index = {i: j for i, j in
zip(samples, get_index(context, samples, 'sample'))}

packed = '/'.join(["%d/%s" % (v, i)
for i, v in zip(remapped,
int_values.data)])
post(context, 'LPUSH', 'feature:%s/%s' % (id_, packed))

payload = "features-represented/%s" % '/'.join(obs)
post(context, 'SADD', payload)
_load_axis_data(table, samples, obs, obs_index, 'sample', context,
batchsize=10)
_load_axis_data(table, obs, samples, samp_index, 'feature', context,
batchsize=500)

# load up taxonomy
taxonomy = _metadata_to_taxonomy_tree(table.ids(axis='observation'),
Expand Down Expand Up @@ -615,17 +728,19 @@ def _stage_for_load(table, context, get, tag=None):
return table.filter(lambda v, i, md: v.sum() > 0, axis='observation')


def get_index(context, key, axis):
def get_index(context, keys, axis, batchsize=100):
"""Get a unique integer value for a key within a context
Parameters
----------
context : str
The context to operate in
key : str
The key to get a unique index for
keys : list or tuple of str
The keys to get a unique index for
axis : str
Either feature or sample
batchsize : int, optional
The number of IDs to query at once
Notes
-----
Expand All @@ -638,24 +753,28 @@ def get_or_set(d, item):
Returns
-------
int
A unique integer index within the context for the key
tuple of int
The unique integer indices within the context for the keys. This is
returned in index order with keys.
"""
import redbiom
import json
import redbiom._requests

config = redbiom.get_config()

# we need to issue the request directly as the command structure is
# rather different than other commands
s = redbiom._requests.get_session()
sha = ScriptManager.get('get-index')
url = '/'.join([config['hostname'], 'EVALSHA', sha,
'1', "%s:%s-index" % (context, axis), key])
req = s.get(url)

if req.status_code != 200:
raise ValueError("Unable to obtain index; %d; %s" % (req.status_code,
req.content))

return int(req.json()['EVALSHA'])
post = redbiom._requests.make_post(config)
indexer_sha = ScriptManager.get('get-index')
context_axis = "%s:%s-index" % (context, axis)

indices = []
splits = max(1, ceil(len(keys) / batchsize))
for batch in np.array_split(keys, splits):
# the redis EVALSHA command structure requires specifying how many keys
# there are, which is always 1 in this case.
nkeys = '1'
payload = [indexer_sha, nkeys, context_axis] + list(batch)
data = json.loads(post(None, 'EVALSHA', '/'.join(payload)))
indices.extend(data)

return indices
19 changes: 16 additions & 3 deletions redbiom/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,28 @@ def test_metadata_to_taxonomy_tree(self):
obs = redbiom.admin._metadata_to_taxonomy_tree(*input)
self.assertEqual(obs.compare_subsets(exp), 0.0)

def test_get_index(self):
def test_get_index_singular(self):
context = 'load-features-test'
redbiom.admin.create_context(context, 'foo')

tests = [('A', 0), ('A', 0), ('B', 1), ('C', 2),
('B', 1), ('Z', 3), ('A', 0)]
for key, exp in tests:
obs = redbiom.admin.get_index(context, key, 'feature')
self.assertEqual(obs, exp)
obs = redbiom.admin.get_index(context, [key, ], 'feature')
self.assertEqual(len(obs), 1)
self.assertEqual(obs[0], exp)

def test_get_index_batch(self):
context = 'load-features-test'
redbiom.admin.create_context(context, 'foo')

tests = [('A', 0), ('A', 0), ('B', 1), ('C', 2),
('B', 1), ('Z', 3), ('A', 0)]

keys = [a for a, _ in tests]
exp = [b for _, b in tests]
obs = redbiom.admin.get_index(context, keys, 'feature')
self.assertEqual(obs, exp)

def test_create_context(self):
obs = self.get('state', 'HGETALL', 'contexts')
Expand Down

0 comments on commit d4af5a2

Please sign in to comment.