Skip to content

Commit

Permalink
Add calc_adc_segments for segmentation of long ADC. (imr-framework#196)
Browse files Browse the repository at this point in the history
* Add ADC samples limit and divisor.

* Correct typing.

* Add calc_adc_segments

* Add test for calc_adc_segments and expected output generated by MATLAB calcAdcSeg

* Manage import of calc_adc_segments

* Manage import of calc_adc_segments

* Apply changes from PR review

* Replace math.lcm with custom _lcm
  • Loading branch information
felixlandmeyer authored Sep 3, 2024
1 parent 6d4f59b commit 9130a82
Show file tree
Hide file tree
Showing 5 changed files with 72,274 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pypulseq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def round_half_up(n, decimals=0):
from pypulseq.calc_ramp import calc_ramp
from pypulseq.calc_rf_bandwidth import calc_rf_bandwidth
from pypulseq.calc_rf_center import calc_rf_center
from pypulseq.make_adc import make_adc
from pypulseq.make_adc import make_adc, calc_adc_segments
from pypulseq.make_adiabatic_pulse import make_adiabatic_pulse
from pypulseq.make_arbitrary_grad import make_arbitrary_grad
from pypulseq.make_arbitrary_rf import make_arbitrary_rf
Expand Down
159 changes: 158 additions & 1 deletion pypulseq/make_adc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from types import SimpleNamespace

from typing import Optional, Tuple, List
from math import isclose, floor, ceil ,gcd, prod
import itertools
import numpy as np
from pypulseq.opts import Opts


Expand Down Expand Up @@ -67,3 +70,157 @@ def make_adc(
adc.delay = adc.dead_time

return adc

def calc_adc_segments(
num_samples: int,
dwell: float,
system: Optional[Opts] = None,
mode: str = 'lengthen'
) -> Tuple[int, int]:
"""Calculate splitting of the ADC in segments with equal samples.
Parameters
----------
num_samples : int
Initial number of samples to split into segments.
dwell : float
Dwell time of the ADC in [s]
system : Optional[Opts], default=None
System limits. Default is a system limits object initialised to default values.
mode : str, default='lengthen'
The total number of samples can either be shortened or lengthened to match the constraints.
Returns
-------
(num_segments, num_samples_seg): int, int
Number of segments and number of samples per segment.
Notes
-----
"On some scanners, notably Siemens, ADC objects that exceed a certain
sample length (8192 samples on Siemens) should be splittable to N
equal parts, each of which aligned to the gradient raster. Each
segment, however, needs to have the number of samples smaller than
system.adcSamplesLimit' and divisible by system.adcSamplesDivisor to be
executable on the scanner. The optional parameter mode can be either
'shorten' or 'lengthen'." [Matlab implementation of 'calcAdcSeg'. https://github.com/pulseq]
Raises
------
ValueError
If 'mode' is not 'shorten' or 'lengthen'.
ValueError
If no suitable segmentation could be found.
ValueError
If number of segments exceeds 128.
"""
# Define maximum number of segments for the ADC
MAX_SEGMENTS = 128

if mode not in ['shorten', 'lengthen']:
raise ValueError(
f"'mode' must be 'shorten' or 'lengthen' but is {mode}"
)

if system is None:
system = Opts.default

# Check if single adc is sufficient
if system.adc_samples_limit <= 0:
return 1, num_samples

# Get minimum number of samples for which the adc duration
# is a multiple of grad raster time (GRT) and adc raster time (ART)
i_gr = round(system.grad_raster_time/system.adc_raster_time) # GRT in units of ART
if not isclose(system.grad_raster_time/system.adc_raster_time, i_gr):
raise ValueError(
"System 'grad_raster_time' is not a multiple of 'adc_raster_time'.")

i_dwell = round(dwell/system.adc_raster_time) # dwell in units of ART
if not isclose(dwell/system.adc_raster_time, i_dwell):
raise ValueError(
"'dwell' is not a multiple of system 'adc_raster_time'.")

i_common = _lcm(i_gr, i_dwell)
min_samples_segment = int(i_common/i_dwell) # lcm(a,b)/b is always int

# Siemens: Number of Samples should be divisible by a divisor
gcd_adcdiv = gcd(min_samples_segment, system.adc_samples_divisor)
if gcd_adcdiv != system.adc_samples_divisor:
min_samples_segment *= system.adc_samples_divisor/gcd_adcdiv

# Get segment multiplier
if mode == 'shorten':
samples_seg_multip = floor(num_samples/min_samples_segment)
else:
samples_seg_multip = ceil(num_samples/min_samples_segment)
while (samples_seg_multip > 0
and samples_seg_multip < (2*num_samples/min_samples_segment)):
# Get prime factors from segments multiplier
adc_seg_primes = _prime_factors(samples_seg_multip)
num_segments = 1
if len(adc_seg_primes) > 1:
num_segments_candids = set()
for k in range(1, len(adc_seg_primes)+1):
num_segments_candids |= set(prod(perm) for perm in itertools.combinations(adc_seg_primes, k))
# Find suitable candidate
for num_segments in sorted(num_segments_candids):
num_samples_seg = (
samples_seg_multip
* min_samples_segment
/ num_segments
)
if (num_samples_seg <= system.adc_samples_limit
and num_segments<=MAX_SEGMENTS):
break # Found segments and samples
else: # Only one possible solution
num_samples_seg = samples_seg_multip * min_samples_segment

# Does output already fulfill constraints?
if (num_samples_seg <= system.adc_samples_limit
and num_segments<=MAX_SEGMENTS):
break
else: # Shorten or lengthen the number of samples per segment
samples_seg_multip += (1 if mode=='lengthen' else -1)

# Validate constraints
if samples_seg_multip <= 0:
raise ValueError(
"Could not find suitable segmentation.")
if num_samples_seg == 0:
raise ValueError(
"Could not find suitable number of samples per segment.")
if num_segments > MAX_SEGMENTS:
raise ValueError(
f"Number of segments ({num_segments}) exceeds allowed number of {MAX_SEGMENTS}")

return int(num_segments), int(num_samples_seg)


def _prime_factors(n: int) -> List[int]:
"""Compute the prime factors of given integer n."""
if n == 1:
return [1]
else:
factors = []
# Dividing n by 2 until it's odd
while n % 2 == 0:
factors.append(2)
n //= 2

# Checking odd factors from 3 upwards
i = 3
while i * i <= n:
while n % i == 0:
factors.append(i)
n //= i
i += 2
if n > 2:
factors.append(n)

return factors

def _lcm(a, b):
"""Calculates the least common multiple of a and b."""
return (a*b)//gcd(a, b)

63 changes: 39 additions & 24 deletions pypulseq/opts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional
from pypulseq.convert import convert


Expand Down Expand Up @@ -32,6 +33,10 @@ class Opts:
Raster time for radio-frequency pulses.
rf_ringdown_time : float, default=0
Ringdown time for radio-frequency pulses.
adc_samples_limit : int, default=0
Maximum number of samples for a single ADC object. If 0, no limit is set.
adc_samples_divisor : int, default=4
Samples of ADC must be divisible by 'adc_samples_divisor'.
rise_time : float, default=0
Rise time for gradients.
slew_unit : str, default='Hz/m/s'
Expand All @@ -47,20 +52,22 @@ class Opts:
"""
def __init__(
self,
adc_dead_time: float = None,
adc_raster_time: float = None,
block_duration_raster: float = None,
gamma: float = None,
grad_raster_time: float = None,
adc_dead_time: Optional[float] = None,
adc_raster_time: Optional[float] = None,
block_duration_raster: Optional[float] = None,
gamma: Optional[float] = None,
grad_raster_time: Optional[float] = None,
grad_unit: str = "Hz/m",
max_grad: float = None,
max_slew: float = None,
rf_dead_time: float = None,
rf_raster_time: float = None,
rf_ringdown_time: float = None,
rise_time: float = None,
max_grad: Optional[float] = None,
max_slew: Optional[float] = None,
rf_dead_time: Optional[float] = None,
rf_raster_time: Optional[float] = None,
rf_ringdown_time: Optional[float] = None,
adc_samples_limit: Optional[int] = None,
adc_samples_divisor: Optional[int] = None,
rise_time: Optional[float] = None,
slew_unit: str = "Hz/m/s",
B0: float = None,
B0: Optional[float] = None,
):
valid_grad_units = ["Hz/m", "mT/m", "rad/ms/mm"]
valid_slew_units = ["Hz/m/s", "mT/m/ms", "T/m/s", "rad/ms/mm/ms"]
Expand All @@ -77,42 +84,46 @@ def __init__(
f"Passed: {slew_unit}"
)

if gamma == None:
if gamma is None:
gamma = Opts.default.gamma

if max_grad != None:
if max_grad is not None:
max_grad = convert(
from_value=max_grad, from_unit=grad_unit, to_unit="Hz/m", gamma=abs(gamma)
)
else:
max_grad = Opts.default.max_grad

if max_slew != None:
if max_slew is not None:
max_slew = convert(
from_value=max_slew, from_unit=slew_unit, to_unit="Hz/m", gamma=abs(gamma)
)
else:
max_slew = Opts.default.max_slew

if rise_time != None:
if rise_time is not None:
max_slew = max_grad / rise_time

if adc_dead_time == None:
if adc_dead_time is None:
adc_dead_time = Opts.default.adc_dead_time
if adc_raster_time == None:
if adc_raster_time is None:
adc_raster_time = Opts.default.adc_raster_time
if block_duration_raster == None:
if block_duration_raster is None:
block_duration_raster = Opts.default.block_duration_raster

if rf_dead_time == None:
if rf_dead_time is None:
rf_dead_time = Opts.default.rf_dead_time
if rf_raster_time == None:
if rf_raster_time is None:
rf_raster_time = Opts.default.rf_raster_time
if grad_raster_time == None:
if grad_raster_time is None:
grad_raster_time = Opts.default.grad_raster_time
if rf_ringdown_time == None:
if rf_ringdown_time is None:
rf_ringdown_time = Opts.default.rf_ringdown_time
if B0 == None:
if adc_samples_limit is None:
adc_samples_limit = Opts.default.adc_samples_limit
if adc_samples_divisor is None:
adc_samples_divisor = Opts.default.adc_samples_divisor
if B0 is None:
B0 = Opts.default.B0

self.max_grad = max_grad
Expand All @@ -125,6 +136,8 @@ def __init__(
self.rf_raster_time = rf_raster_time
self.grad_raster_time = grad_raster_time
self.block_duration_raster = block_duration_raster
self.adc_samples_limit = adc_samples_limit
self.adc_samples_divisor = adc_samples_divisor
self.gamma = gamma
self.B0 = B0

Expand All @@ -142,6 +155,8 @@ def reset_default(cls):
rf_raster_time=1e-6,
grad_raster_time=10e-6,
block_duration_raster=10e-6,
adc_samples_limit=0,
adc_samples_divisor=4,
gamma=42576000,
B0=1.5)

Expand Down
Loading

0 comments on commit 9130a82

Please sign in to comment.