Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deque for signal aggregation #22

Merged
merged 9 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ Changelog
=========

Version 0.1.0
DomFijan marked this conversation as resolved.
Show resolved Hide resolved
=============
-------------

[0.1.0] -- 2024-01-06
---------------------
* Added the possibility for the aggregator to have a constant signal length


Version 0.0.1 -- 2024-01-06
----------------------------

First official release.
24 changes: 20 additions & 4 deletions dupin/data/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Helper module for generating/storing features accross an entire trajectory.
"""Helper module for generating/storing features across an entire trajectory.

This class provides the `SignalAggregator` class which takes a pipeline and
provides methods for storing the output across a trajectory.
"""

from collections import deque
from collections.abc import Iterator
from typing import Any, Optional

Expand All @@ -28,7 +29,10 @@ class SignalAggregator:
This class can be used to create appropriate data structures for use in
analyzing a whole trajectory with offline methods or iteratively
analyzing for online use. See the `compute` and `accumulate` methods for
usage.
usage. The signal is stored in a deque, which allows for signal to be of
constant length by forgetting earlier values. This is useful for online
DomFijan marked this conversation as resolved.
Show resolved Hide resolved
detection. Full signal length can be achieved by setting `max_deque_length`
to `None`.

Parameters
----------
Expand All @@ -37,7 +41,9 @@ class SignalAggregator:
multivariate signal of a trajectory.
logger : dupin.data.logging.Logger
A logger object to store information about the data processing of
the given pipeline. Defaults to ``None``.
max_deque_length : int, optional
The maximum length of the deque. If None, the deque will be of infinite
size (like a list) and whole signal will be stored.

Attributes
----------
Expand All @@ -51,9 +57,14 @@ def __init__(
self,
generator: base.GeneratorLike,
logger: Optional[logging.Logger] = None,
max_deque_length=None,
):
self.generator = generator
self.signals = []
self._max_deque_length = max_deque_length
if max_deque_length is not None:
self.signals = deque(maxlen=max_deque_length)
else:
self.signals = []
self.logger = logger

def compute(
Expand Down Expand Up @@ -205,6 +216,11 @@ def logger(self):
"""dupin.data.logging.Logger: Logger for the aggregator."""
return self._logger

@property
def max_deque_length(self):
"""int: The maximum length of the deque."""
return self._max_deque_length

@logger.setter
def logger(self, new_logger):
self._logger = new_logger
Expand Down
22 changes: 22 additions & 0 deletions tests/data/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ def yield_for_generator():
assert all(k in dict_ for k in keys for dict_ in instance.signals)


@given(n_frames(), keys(), data())
def test_compute_no_args_deque(n_frames, keys, data):
"""Test compute works with correct iterator."""
schema = fixed_dictionaries({k: floats() for k in keys})

def generator():
return data.draw(schema)

deque_length = 5

instance = du.data.aggregate.SignalAggregator(
generator, max_deque_length=deque_length
)
for ii, _ in enumerate(range(n_frames)):
instance.accumulate()
if ii >= deque_length:
assert len(instance.signals) == deque_length
else:
assert len(instance.signals) == ii + 1
assert all(k in dict_ for k in keys for dict_ in instance.signals)


@given(n_frames(), keys(), data())
def test_compute_with_args(n_frames, keys, data):
"""Test compute works with correct iterator."""
Expand Down