forked from MHKiT-Software/MHKiT-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Convert power module to xarray (MHKiT-Software#282)
* power/quality add basic formatting * initial conversion or power/quality submodule * add xarray tests for power.quality * fix variable assignment in power.quality * power.characteristics add basic formatting for xr conversion * update error messages * finish converting power.quality to xarray * fix spaces/formating in docstrings * clean up conversion of inputs from pandas to xr.dataset * add tests for xarray * update handling of timestamps in power.characteristics for xr * fix length in power.quality.harmonics * fix length in power.quality.harmonics, again * add frequency_dimension and time_dimension arguments * remove old imports * remove obsolete argument from THCD tests * type check on to_pandas * add type and value checks for time_dimension and frequency_dimension * make grid_freq checks f strings that return the incorrect value * add frequency_dimension valueError * add formal docstring to _convert_to_dataset * add line_to_line type check * restore old naming convention to ac_power_three_phase * update example call to THCD * return hard coded test answers to being recalculated
- Loading branch information
Showing
4 changed files
with
543 additions
and
291 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,121 +1,249 @@ | ||
import pandas as pd | ||
import xarray as xr | ||
import numpy as np | ||
from scipy.signal import hilbert | ||
import datetime | ||
|
||
def instantaneous_frequency(um): | ||
def instantaneous_frequency(um, time_dimension="", to_pandas=True): | ||
|
||
""" | ||
Calculates instantaneous frequency of measured voltage | ||
Parameters | ||
----------- | ||
um: pandas Series or DataFrame | ||
Measured voltage (V) indexed by time | ||
um: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset | ||
Measured voltage (V) indexed by time | ||
time_dimension: string (optional) | ||
Name of the xarray dimension corresponding to time. If not supplied, | ||
defaults to the first dimension. Does not affect pandas input. | ||
to_pandas: bool (Optional) | ||
Flag to save output to pandas instead of xarray. Default = True. | ||
Returns | ||
--------- | ||
frequency: pandas DataFrame | ||
Frequency of the measured voltage (Hz) indexed by time | ||
frequency: pandas DataFrame or xarray Dataset | ||
Frequency of the measured voltage (Hz) indexed by time | ||
with signal name columns | ||
""" | ||
if not isinstance(um, (pd.Series, pd.DataFrame)): | ||
raise TypeError(f'um must be of type pd.Series or pd.DataFrame. Got: {type(um)}') | ||
|
||
if isinstance(um.index[0], datetime.datetime): | ||
t = (um.index - datetime.datetime(1970,1,1)).total_seconds() | ||
else: | ||
t = um.index | ||
if not isinstance(um, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)): | ||
raise TypeError('um must be of type pd.Series, pd.DataFrame, ' + | ||
f'xr.DataArray, or xr.Dataset. Got {type(um)}') | ||
if not isinstance(to_pandas, bool): | ||
raise TypeError( | ||
f'to_pandas must be of type bool. Got: {type(to_pandas)}') | ||
if not isinstance(time_dimension, str): | ||
raise TypeError( | ||
f'time_dimension must be of type bool. Got: {type(time_dimension)}') | ||
|
||
# Convert input to xr.Dataset | ||
um = _convert_to_dataset(um, 'data') | ||
|
||
if time_dimension != '' and time_dimension not in um.coords: | ||
raise ValueError('time_dimension was supplied but is not a dimension ' | ||
+ f'of um. Got {time_dimension}') | ||
|
||
dt = pd.Series(t).diff()[1:] | ||
# Get the dimension of interest | ||
if time_dimension == "": | ||
time_dimension = list(um.coords)[0] | ||
|
||
if isinstance(um,pd.Series): | ||
um = um.to_frame() | ||
# Calculate time step | ||
if isinstance(um.coords[time_dimension].values[0], np.datetime64): | ||
t = (um[time_dimension] - np.datetime64('1970-01-01 00:00:00'))/np.timedelta64(1, 's') | ||
else: | ||
t = um[time_dimension] | ||
dt = np.diff(t) | ||
|
||
columns = um.columns | ||
frequency=pd.DataFrame(columns=columns) | ||
for column in um.columns: | ||
f = hilbert(um[column]) | ||
# Calculate frequency | ||
frequency = xr.Dataset() | ||
for var in um.data_vars: | ||
f = hilbert(um[var]) | ||
instantaneous_phase = np.unwrap(np.angle(f)) | ||
instantaneous_frequency = np.diff(instantaneous_phase) /(2.0*np.pi) * (1/dt) | ||
frequency[column] = instantaneous_frequency | ||
|
||
instantaneous_frequency = np.diff(instantaneous_phase)/(2.0*np.pi) * (1/dt) | ||
|
||
frequency = frequency.assign({var: (time_dimension, instantaneous_frequency)}) | ||
frequency = frequency.assign_coords({time_dimension: um.coords[time_dimension].values[0:-1]}) | ||
|
||
if to_pandas: | ||
frequency = frequency.to_pandas() | ||
|
||
return frequency | ||
|
||
def dc_power(voltage, current): | ||
def dc_power(voltage, current, to_pandas=True): | ||
""" | ||
Calculates DC power from voltage and current | ||
Parameters | ||
----------- | ||
voltage: pandas Series or DataFrame | ||
voltage: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset | ||
Measured DC voltage [V] indexed by time | ||
current: pandas Series or DataFrame | ||
current: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset | ||
Measured three phase current [A] indexed by time | ||
to_pandas: bool (Optional) | ||
Flag to save output to pandas instead of xarray. Default = True. | ||
Returns | ||
-------- | ||
P: pandas DataFrame | ||
P: pandas DataFrame or xarray Dataset | ||
DC power [W] from each channel and gross power indexed by time | ||
""" | ||
if not isinstance(voltage, (pd.Series, pd.DataFrame)): | ||
raise TypeError(f'voltage must be of type pd.Series or pd.DataFrame. Got: {type(voltage)}') | ||
if not isinstance(current, (pd.Series, pd.DataFrame)): | ||
raise TypeError(f'current must be of type pd.Series or pd.DataFrame. Got: {type(current)}') | ||
if not voltage.shape == current.shape: | ||
raise ValueError('current and volatge must have the same shape') | ||
|
||
if not isinstance(voltage, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)): | ||
raise TypeError('voltage must be of type pd.Series, pd.DataFrame, ' + | ||
f'xr.DataArray, or xr.Dataset. Got {type(voltage)}') | ||
if not isinstance(current, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)): | ||
raise TypeError('current must be of type pd.Series, pd.DataFrame, ' + | ||
f'xr.DataArray, or xr.Dataset. Got {type(current)}') | ||
if not isinstance(to_pandas, bool): | ||
raise TypeError( | ||
f'to_pandas must be of type bool. Got: {type(to_pandas)}') | ||
|
||
# Convert inputs to xr.Dataset | ||
voltage = _convert_to_dataset(voltage, 'voltage') | ||
current = _convert_to_dataset(current, 'current') | ||
|
||
# Check that sizes are the same | ||
if not (voltage.sizes == current.sizes and len(voltage.data_vars) == len(current.data_vars)): | ||
raise ValueError('current and voltage must have the same shape') | ||
|
||
P = xr.Dataset() | ||
gross = None | ||
|
||
P = current.values * voltage.values | ||
P = pd.DataFrame(P) | ||
P['Gross'] = P.sum(axis=1, skipna=True) | ||
# Multiply current and voltage variables together, in order they're assigned | ||
for i, (current_var, voltage_var) in enumerate(zip(current.data_vars,voltage.data_vars)): | ||
temp = current[current_var]*voltage[voltage_var] | ||
P = P.assign({f'{i}': temp}) | ||
if gross is None: | ||
gross = temp | ||
else: | ||
gross = gross + temp | ||
|
||
P = P.assign({'Gross': gross}) | ||
|
||
if to_pandas: | ||
P = P.to_dataframe() | ||
|
||
return P | ||
|
||
def ac_power_three_phase(voltage, current, power_factor, line_to_line=False): | ||
def ac_power_three_phase(voltage, current, power_factor, line_to_line=False, to_pandas=True): | ||
""" | ||
Calculates magnitude of active AC power from line to neutral voltage and current | ||
Parameters | ||
----------- | ||
voltage: pandas DataFrame | ||
Time-series of three phase measured voltage [V] indexed by time | ||
current: pandas DataFrame | ||
Time-series of three phase measured current [A] indexed by time | ||
voltage: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset | ||
Measured DC voltage [V] indexed by time | ||
current: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset | ||
Measured three phase current [A] indexed by time | ||
power_factor: float | ||
Power factor for the efficiency of the system | ||
line_to_line: bool | ||
line_to_line: bool (Optional) | ||
Set to true if the given voltage measurements are line_to_line | ||
to_pandas: bool (Optional) | ||
Flag to save output to pandas instead of xarray. Default = True. | ||
Returns | ||
-------- | ||
P: pandas DataFrame | ||
P: pandas DataFrame or xarray Dataset | ||
Magnitude of active AC power [W] indexed by time with Power column | ||
""" | ||
if not isinstance(voltage, pd.DataFrame): | ||
raise TypeError(f'voltage must be of type pd.DataFrame. Got: {type(voltage)}') | ||
if not isinstance(current, pd.DataFrame): | ||
raise TypeError(f'current must be of type pd.DataFrame. Got: {type(current)}') | ||
if not len(voltage.columns) == 3: | ||
if not isinstance(voltage, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)): | ||
raise TypeError('voltage must be of type pd.Series, pd.DataFrame, ' + | ||
f'xr.DataArray, or xr.Dataset. Got {type(voltage)}') | ||
if not isinstance(current, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)): | ||
raise TypeError('current must be of type pd.Series, pd.DataFrame, ' + | ||
f'xr.DataArray, or xr.Dataset. Got {type(current)}') | ||
if not isinstance(line_to_line, bool): | ||
raise TypeError( | ||
f'line_to_line must be of type bool. Got: {type(line_to_line)}') | ||
if not isinstance(to_pandas, bool): | ||
raise TypeError( | ||
f'to_pandas must be of type bool. Got: {type(to_pandas)}') | ||
|
||
# Convert inputs to xr.Dataset | ||
voltage = _convert_to_dataset(voltage, 'voltage') | ||
current = _convert_to_dataset(current, 'current') | ||
|
||
# Check that sizes are the same | ||
if not len(voltage.data_vars) == 3: | ||
raise ValueError('voltage must have three columns') | ||
if not len(current.columns) == 3: | ||
if not len(current.data_vars) == 3: | ||
raise ValueError('current must have three columns') | ||
if not current.shape == voltage.shape: | ||
if not current.sizes == voltage.sizes: | ||
raise ValueError('current and voltage must be of the same size') | ||
|
||
|
||
abs_current = np.abs(current.values) | ||
abs_voltage = np.abs(voltage.values) | ||
power = dc_power(voltage, current, to_pandas=False)['Gross'] | ||
power.name = 'Power' | ||
power = power.to_dataset() # force xr.DataArray to be consistently in xr.Dataset format | ||
P = np.abs(power) * power_factor | ||
|
||
if line_to_line: | ||
power = abs_current * (abs_voltage * np.sqrt(3)) | ||
else: | ||
power = abs_current * abs_voltage | ||
|
||
power = pd.DataFrame(power) | ||
P = power.sum(axis=1) * power_factor | ||
P = P.to_frame('Power') | ||
|
||
P = P * np.sqrt(3) | ||
|
||
if to_pandas: | ||
P = P.to_pandas() | ||
|
||
return P | ||
|
||
def _convert_to_dataset(data, name='data'): | ||
""" | ||
Converts the given data to an xarray.Dataset. | ||
This function is designed to handle inputs that can be either a pandas DataFrame, a pandas Series, | ||
an xarray DataArray, or an xarray Dataset. It ensures that the output is consistently an xarray.Dataset. | ||
Parameters | ||
---------- | ||
data: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset | ||
The data to be converted. | ||
name: str (Optional) | ||
The name to assign to the data variable in case the input is an xarray DataArray without a name. | ||
Default value is 'data'. | ||
Returns | ||
------- | ||
xarray.Dataset | ||
The input data converted to an xarray.Dataset. If the input is already an xarray.Dataset, | ||
it is returned as is. | ||
Examples | ||
-------- | ||
>>> df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) | ||
>>> ds = _convert_to_dataset(df) | ||
>>> type(ds) | ||
<class 'xarray.core.dataset.Dataset'> | ||
>>> series = pd.Series([1, 2, 3], name='C') | ||
>>> ds = _convert_to_dataset(series) | ||
>>> type(ds) | ||
<class 'xarray.core.dataset.Dataset'> | ||
>>> data_array = xr.DataArray([1, 2, 3]) | ||
>>> ds = _convert_to_dataset(data_array, name='D') | ||
>>> type(ds) | ||
<class 'xarray.core.dataset.Dataset'> | ||
""" | ||
if not isinstance(data, (pd.DataFrame, pd.Series, xr.DataArray, xr.Dataset)): | ||
raise TypeError("Input data must be of type pandas.DataFrame, pandas.Series, " | ||
"xarray.DataArray, or xarray.Dataset") | ||
|
||
if not isinstance(name, str): | ||
raise TypeError("The 'name' parameter must be a string") | ||
|
||
# Takes data that could be pd.DataFrame, pd.Series, xr.DataArray, or | ||
# xr.Dataset and converts it to xr.Dataset | ||
if isinstance(data, (pd.DataFrame, pd.Series)): | ||
data = data.to_xarray() | ||
|
||
if isinstance(data, xr.DataArray): | ||
if data.name is None: | ||
data.name = name # xr.DataArray.to_dataset() breaks if the data variable is unnamed | ||
data = data.to_dataset() | ||
|
||
return data |
Oops, something went wrong.