Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize #105

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bolt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from bolt.factory import array, ones, zeros, concatenate
from bolt.array.construct import array, ones, zeros, concatenate

__version__ = '0.7.1'
File renamed without changes.
72 changes: 37 additions & 35 deletions bolt/spark/array.py → bolt/array/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
r_, sort, argsort, array, random, arange, ones, expand_dims, sum
from itertools import groupby

from bolt.base import BoltArray
from bolt.spark.stack import StackedArray
from bolt.spark.utils import zip_with_index
from bolt.spark.statcounter import StatCounter
from bolt.array.stack import StackedArray
from bolt.array.utils import zip_with_index
from bolt.array.statcounter import StatCounter
from bolt.utils import slicify, listify, tupleize, argpack, inshape, istransposeable, isreshapeable


class BoltArraySpark(BoltArray):
class BoltArray(object):

_metadata = {
'_shape': None,
Expand All @@ -24,12 +23,25 @@ def __init__(self, rdd, shape=None, split=None, dtype=None, ordered=True):
self._shape = shape
self._split = split
self._dtype = dtype
self._mode = 'spark'
self._ordered = ordered

def __finalize__(self, other):
if isinstance(other, BoltArray):
for name in self._metadata:
other_attr = getattr(other, name, None)
if (other_attr is not self._metadata[name]) \
and (getattr(self, name, None) is self._metadata[name]):
object.__setattr__(self, name, other_attr)
return self

def __repr__(self):
s = "BoltArray\n"
s += "shape: %s\n" % str(self.shape)
return s

@property
def _constructor(self):
return BoltArraySpark
return BoltArray

def __array__(self):
return self.toarray()
Expand Down Expand Up @@ -98,7 +110,7 @@ def _align(self, axis):

Returns
-------
BoltArraySpark
BoltArray
"""
# ensure that the specified axes are valid
inshape(self.shape, axis)
Expand Down Expand Up @@ -149,7 +161,7 @@ def map(self, func, axis=(0,), value_shape=None, dtype=None, with_keys=False):

Returns
-------
BoltArraySpark
BoltArray
"""
axis = tupleize(axis)
swapped = self._align(axis)
Expand Down Expand Up @@ -212,7 +224,7 @@ def filter(self, func, axis=(0,), sort=False):

Returns
-------
BoltArraySpark
BoltArray
"""
axis = tupleize(axis)

Expand Down Expand Up @@ -259,9 +271,8 @@ def reduce(self, func, axis=(0,), keepdims=False):

Returns
-------
BoltArraySpark
BoltArray
"""
from bolt.local.array import BoltArrayLocal
from numpy import ndarray

axis = tupleize(axis)
Expand All @@ -279,7 +290,7 @@ def reduce(self, func, axis=(0,), keepdims=False):
# ndarrays with single values in them should be converted into scalars
return arr[0]

return BoltArrayLocal(arr)
return arr

def _stat(self, axis=None, func=None, name=None, keepdims=False):
"""
Expand All @@ -295,7 +306,7 @@ def _stat(self, axis=None, func=None, name=None, keepdims=False):
will compute over all axes

func : function, optional, default=None
Function for reduce, see BoltArraySpark.reduce
Function for reduce, see BoltArray.reduce

name : str
A named statistic, see StatCounter
Expand All @@ -311,8 +322,6 @@ def _stat(self, axis=None, func=None, name=None, keepdims=False):
return self.reduce(func, axis, keepdims)

if name and not func:
from bolt.local.array import BoltArrayLocal

swapped = self._align(axis)

def reducer(left, right):
Expand All @@ -328,7 +337,7 @@ def reducer(left, right):
for i in axis:
arr = expand_dims(arr, axis=i)

return BoltArrayLocal(arr).toscalar()
return arr

else:
raise ValueError('Must specify either a function or a statistic name.')
Expand Down Expand Up @@ -432,21 +441,21 @@ def concatenate(self, arry, axis=0):

Paramters
---------
arry : ndarray, BoltArrayLocal, or BoltArraySpark
arry : ndarray, or BoltArray
Another array to concatenate with

axis : int, optional, default=0
The axis along which arrays will be joined.

Returns
-------
BoltArraySpark
BoltArray
"""
if isinstance(arry, ndarray):
from bolt.spark.construct import ConstructSpark
arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split))
from bolt.array.construct import array
arry = array(arry, self._rdd.context, axis=range(0, self.split))
else:
if not isinstance(arry, BoltArraySpark):
if not isinstance(arry, BoltArray):
raise ValueError("other must be local array or spark array, got %s" % type(arry))

if not all([x == y if not i == axis else True
Expand Down Expand Up @@ -708,7 +717,7 @@ def chunk(self, size="150", axis=None, padding=None):
axis = tupleize((axis))
padding = tupleize((padding))

from bolt.spark.chunk import ChunkedArray
from bolt.array.chunk import ChunkedArray

chnk = ChunkedArray(rdd=self._rdd, shape=self._shape, split=self._split, dtype=self._dtype)
return chnk._chunk(size, axis, padding)
Expand Down Expand Up @@ -739,7 +748,7 @@ def swap(self, kaxes, vaxes, size="150"):

Returns
-------
BoltArraySpark
BoltArray
"""
kaxes = asarray(tupleize(kaxes), 'int')
vaxes = asarray(tupleize(vaxes), 'int')
Expand All @@ -753,7 +762,7 @@ def swap(self, kaxes, vaxes, size="150"):
if len(kaxes) == 0 and len(vaxes) == 0:
return self

from bolt.spark.chunk import ChunkedArray
from bolt.array.chunk import ChunkedArray

chunks = self.chunk(size)

Expand Down Expand Up @@ -853,7 +862,7 @@ def reshape(self, *shape):
i = self._reshapebasic(new)
if i == -1:
raise NotImplementedError("Currently no support for reshaping between "
"keys and values for BoltArraySpark")
"keys and values for BoltArray")
else:
new_key_shape, new_value_shape = new[:i], new[i:]
return self.keys.reshape(new_key_shape).values.reshape(new_value_shape)
Expand Down Expand Up @@ -988,21 +997,14 @@ def keys(self):
"""
Returns a restricted keys.
"""
from bolt.spark.shapes import Keys
from bolt.array.shapes import Keys
return Keys(self)

@property
def values(self):
from bolt.spark.shapes import Values
from bolt.array.shapes import Values
return Values(self)

def tolocal(self):
"""
Returns a local bolt array by first collecting as an array.
"""
from bolt.local.array import BoltArrayLocal
return BoltArrayLocal(self.toarray())

def toarray(self):
"""
Returns the contents as a local array.
Expand Down
10 changes: 5 additions & 5 deletions bolt/spark/chunk.py → bolt/array/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from itertools import product

from bolt.utils import tuplesort, tupleize, allstack, iterexpand
from bolt.spark.array import BoltArraySpark
from bolt.array.array import BoltArray


class ChunkedArray(object):
"""
Wraps a BoltArraySpark and provides an interface for chunking
Wraps a BoltArray and provides an interface for chunking
into subarrays and performing operations on chunks. Many methods will
be restricted until the chunked array is unchunked.

Expand Down Expand Up @@ -196,7 +196,7 @@ def _unchunk(it):
else:
newshape = self.shape

return BoltArraySpark(rdd, shape=newshape, split=self._split,
return BoltArray(rdd, shape=newshape, split=self._split,
dtype=self.dtype, ordered=ordered)

def keys_to_values(self, axes, size=None):
Expand Down Expand Up @@ -416,7 +416,7 @@ def map_generic(self, func):
"""
Apply a generic array -> object to each subarray

The resulting object is a BoltArraySpark of dtype object where the
The resulting object is a BoltArray of dtype object where the
blocked dimensions are replaced with indices indication block ID.
"""
def process_record(val):
Expand All @@ -429,7 +429,7 @@ def process_record(val):
nchunks = self.getnumber(self.plan, self.vshape)
newshape = tuple([int(s) for s in r_[self.kshape, nchunks]])
newsplit = len(self.shape)
return BoltArraySpark(rdd, shape=newshape, split=newsplit, ordered=self._ordered, dtype="object")
return BoltArray(rdd, shape=newshape, split=newsplit, ordered=self._ordered, dtype="object")

def getplan(self, size="150", axes=None, padding=None):
"""
Expand Down
Loading