Skip to content

Commit

Permalink
Allow user provided functions to write to disc (#35)
Browse files Browse the repository at this point in the history
* allow user provided functions to write to disc

* additional saving example

* proofread docs

* correct return statement

* correct kwarg name in tutorial
  • Loading branch information
rileyhales authored Jul 8, 2024
1 parent 39c8181 commit fc1f331
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 36 deletions.
4 changes: 0 additions & 4 deletions docs/advanced-skills.md

This file was deleted.

4 changes: 2 additions & 2 deletions docs/configs/config-options.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Configuration File

River Route computations are controlled by several variables. You can pass these variables as keyword arguments to the
`river-route` computations are controlled by several variables. You can pass these variables as keyword arguments to the
corresponding functions or provide a path to a configuration file. Supported file formats for config files are YAML or
JSON. Config files specify the following parameters:

Expand All @@ -12,7 +12,7 @@ JSON. Config files specify the following parameters:

## Minimum Required Inputs

Every river-route process needs at least the following 4 variables
Every `river-route` process needs at least the following 4 variables

- `routing_params_file` - path to the [routing parameters file](io-files.md#routing-parameters) (parquet)
- `connectivity_file` - path to the river network [connectivity file](io-files.md#connectivity-file) (parquet)
Expand Down
2 changes: 1 addition & 1 deletion docs/configs/io-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The file contains 2 dimensions: time and river_id.
It will have 1 variable named "Q" which is an array of shape (time, river_id) of dtype float.
You can change the structure of the output file by overriding the default function to write outputs to disc. See the
[Advanced Skills](../advanced-skills.md)
[Saving Outputs](../saving-outputs) page for more information.
## Initial and Final State Files
Expand Down
8 changes: 4 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# River Route
# River-Route

The River Route Python package is a tool for routing catchment runoff volumes on vector stream networks using the
The `river-route` Python package is a tool for routing catchment runoff volumes on vector stream networks using the
Matrix Muskingum Cunge Method.

## Installation

It is recommended to install river-route in its own environment so you can ensure the latest versions of all the
It is recommended to install `river-route` in its own environment so you can ensure the latest versions of all the
dependencies and to protect the environment that produces your model results from accidents.

River-Route is currently only available from source.
`river-route` is currently only available from source.

```commandline
git clone https://github.com/rileyhales/river-route
Expand Down
105 changes: 105 additions & 0 deletions docs/saving-outputs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
## Writing Routed Flows to Disc

You can override the default function used by `river-route` when writing routed flows to disc. The MuskingumCunge class
formats the discharge data into a Pandas DataFrame and then calls the `write_outflows` method. By default, this function
writes the dataframe to a netCDF file.

A single netCDF is not ideal for all use cases so you can override it to store your data how you prefer. Some examples
of reasons you would want to do this include appending the outputs to an existing file, writing the DataFrame to a
database, or to add metadata or attributes to the file.

You can override the `write_outflows` method directly in your code or use the `set_write_outflows` method. Your custom
function should accept exactly 3 keyword arguments:

1. `df`: a Pandas DataFrame with a datetime index, river id numbers as column labels, and float discharge values.
2. `outflow_file`: a string with the path to the output file.
3. `runoff_file`: a string with the path to the runoff file used to produce this output.

As an example, you might want to write the output DataFrame to a Parquet file instead.

```python title="Write Routed Flows to Parquet"
import pandas as pd

import river_route as rr


def custom_write_outflows(df: pd.DataFrame, outflow_file: str, runoff_file: str) -> None:
df.to_parquet(outflow_file)
return


(
rr
.MuskingumCunge('config.yaml')
.set_write_outflows(custom_write_outflows)
.route()
)
```

```python title="Write Routed Flows to SQLite"
import pandas as pd
import sqlite3

import river_route as rr


def write_outflows_to_sqlite(df: pd.DataFrame, outflow_file: str, runoff_file: str) -> None:
conn = sqlite3.connect(outflow_file)
df.to_sql('routed_flows', conn, if_exists='replace')
conn.close()
return


(
rr
.MuskingumCunge('config.yaml')
.set_write_outflows(write_outflows_to_sqlite)
.route()
)
```

```python title="Append Routed Flows to Existing netCDF"
import os

import pandas as pd
import xarray as xr

import river_route as rr


def append_to_existing_file(df: pd.DataFrame, outflow_file: str, runoff_file: str) -> None:
ensemble_number = os.path.basename(runoff_file).split('_')[1]
with xr.open_dataset(outflow_file) as ds:
ds.sel(ensemble=ensemble_number).Q = df.values
ds.to_netcdf(outflow_file)
return


(
rr
.MuskingumCunge('config.yaml')
.set_write_outflows(append_to_existing_file)
.route()
)
```

```python title="Save a Subset of the Routed Flows"
import pandas as pd

import river_route as rr


def save_partial_results(df: pd.DataFrame, outflow_file: str, runoff_file: str) -> None:
river_ids_to_save = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
df = df[river_ids_to_save]
df.to_parquet(outflow_file)
return


(
rr
.MuskingumCunge('config.yaml')
.set_write_outflows(save_partial_results)
.route()
)
```
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ nav:
- Input & Output Files: configs/io-files.md
- Time Variables: configs/time-variables.md
- API Documentation: api.md
- Advanced Skills: advanced-skills.md
- Saving Outputs: saving-outputs.md
plugins:
- search
- mkdocstrings
Expand Down
64 changes: 41 additions & 23 deletions river_route/_MuskingumCunge.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class MuskingumCunge:
# Calibration variables
_calibration_iteration_number: int

# Methods
write_outflows: callable

def __init__(self, config_file: str = None, **kwargs, ):
self.set_configs(config_file, **kwargs)
return
Expand Down Expand Up @@ -345,7 +348,7 @@ def route(self, **kwargs) -> 'MuskingumCunge':

LOG.info('Writing Outflow Array to File')
outflow_array = np.round(outflow_array, decimals=2)
self._write_outflows(outflow_file, dates, outflow_array)
self._write_outflows(dates, outflow_array, outflow_file, runoff_file)

# write the final state to disc
self._write_final_state()
Expand Down Expand Up @@ -546,30 +549,58 @@ def _calibration_objective(self,
del self.initial_state
return mse

def _write_outflows(self, outflow_file: str, dates: np.array, outflow_array: np.array) -> None:
reference_date = datetime.datetime.fromtimestamp(dates[0].astype(int), tz=datetime.timezone.utc)
def _write_outflows(self, dates: np.array, outflow_array: np.array, outflow_file: str, runoff_file: str) -> None:
dates = dates[::self.num_runoff_steps_per_outflow].astype('datetime64[s]')
dates = dates - dates[0]
df = pd.DataFrame(outflow_array, index=dates, columns=self._read_river_ids())
self.write_outflows(df=df, outflow_file=outflow_file, runoff_file=runoff_file)
return

def write_outflows(self, df: pd.DataFrame, outflow_file: str, runoff_file: str) -> None:
"""
Writes the outflows from a routing simulation to a netcdf file. You should overwrite this method with a custom
handler that writes it in a format that fits your needs.
Args:
df: a Pandas DataFrame with a datetime Index, river_id column names, and discharge values
outflow_file: the file path to write the outflows to
runoff_file: the file path to the runoff file used to generate the outflows
Returns:
None
"""
with nc.Dataset(outflow_file, mode='w', format='NETCDF4') as ds:
ds.createDimension('time', size=dates.shape[0])
ds.createDimension(self.conf['var_river_id'], size=outflow_array.shape[1])
ds.createDimension('time', size=df.shape[0])
ds.createDimension(self.conf['var_river_id'], size=df.shape[1])

time_var = ds.createVariable('time', 'f8', ('time',))
time_var.units = f'seconds since {reference_date.strftime("%Y-%m-%d %H:%M:%S")}'
time_var[:] = dates
time_var.units = f'seconds since {df.index[0].strftime("%Y-%m-%d %H:%M:%S")}'
time_var[:] = df.index.values - df.index.values[0]

id_var = ds.createVariable(self.conf['var_river_id'], 'i4', (self.conf['var_river_id']), )
id_var[:] = self._read_river_ids()
id_var[:] = df.columns.values

flow_var = ds.createVariable(self.conf['var_discharge'], 'f4', ('time', self.conf['var_river_id']))
flow_var[:] = outflow_array
flow_var[:] = df.values
flow_var.long_name = 'Discharge at catchment outlet'
flow_var.standard_name = 'discharge'
flow_var.aggregation_method = 'mean'
flow_var.units = 'm3 s-1'
return

def set_write_outflows(self, func: callable) -> 'MuskingumCunge':
"""
Overwrites the default write_outflows method to a custom function and returns the class instance so that you
can chain the method with the constructor.
Args:
func (callable): a function that takes 3 keyword arguments: df, outflow_file, runoff_file and returns None
Returns:
river_route.MuskingumCunge
"""
self.write_outflows = func
return self

def hydrograph(self, river_id: int) -> pd.DataFrame:
"""
Get the hydrograph for a given river id as a pandas dataframe
Expand Down Expand Up @@ -638,16 +669,3 @@ def mass_balance(self, river_id: int, ancestors: list = None) -> pd.DataFrame:
LOG.warning(f'More discharge than runoff volume for river {river_id}')

return df

def save_configs(self, path: str) -> None:
"""
Save the current configs of the class to a json file
Args:
path: the file path where the json will be written
Returns:
None
"""
with open(path, 'w') as f:
json.dump(self.conf, f)
return
2 changes: 1 addition & 1 deletion river_route/__metadata__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.13.0'
__version__ = '0.14.0'
__author__ = 'Riley Hales PhD'
__url__ = 'https://github.com/rileyhales/river-route'

0 comments on commit fc1f331

Please sign in to comment.