Skip to content

Commit

Permalink
Merge pull request #270 from zmoon/cli-openaq
Browse files Browse the repository at this point in the history
CLI: `get-openaq`
  • Loading branch information
zmoon authored Nov 5, 2024
2 parents bc44e39 + 60fe02f commit ad184bf
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ any Python code::
* |get-aqs|_ -- get AQS data
* |get-ish|_ -- get ISH data
* |get-ish-lite|_ -- get ISH-Lite data
* |get-openaq|_ -- get OpenAQ data

.. |run| replace:: ``run``
.. _run: #melodies-monet-run
Expand All @@ -37,6 +38,9 @@ any Python code::
.. |get-ish-lite| replace:: ``get-ish-lite``
.. _get-ish-lite: #melodies-monet-get-ish-lite

.. |get-openaq| replace:: ``get-openaq``
.. _get-openaq: #melodies-monet-get-openaq

.. click:: melodies_monet._cli:_typer_click_object
:prog: melodies-monet
:nested: full
251 changes: 248 additions & 3 deletions melodies_monet/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
"""
melodies-monet -- MELODIES MONET CLI
"""
import os
import time
from contextlib import contextmanager
from pathlib import Path
from typing import List
from typing import List, Tuple

_LOGGING_LEVEL = os.environ.get("MM_LOGGING_LEVEL", None)
if _LOGGING_LEVEL is not None:
import logging

logging.basicConfig(level=_LOGGING_LEVEL.upper())

try:
import typer
Expand All @@ -20,8 +27,6 @@
)
raise SystemExit(1)

from typing import Tuple

DEBUG = False
INFO_COLOR = typer.colors.CYAN
ERROR_COLOR = typer.colors.BRIGHT_RED
Expand Down Expand Up @@ -1174,6 +1179,246 @@ def get_aqs(
ds.to_netcdf(dst / out_name)


@app.command()
def get_openaq(
start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"),
end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"),
out_name: str = typer.Option(None, "-o",
help=(
"Output file name (or full/relative path). "
"By default the name is generated like 'OpenAQ_<start-date>_<end-date>.nc'."
)
),
dst: Path = typer.Option(".", "-d", "--dst", help=(
"Destination directory (to control output location "
"if using default output file name)."
)
),
param: List[str] = typer.Option(["o3", "pm25", "pm10"], "-p", "--param", help=(
"Parameters. "
"Use '-p' more than once to get multiple parameters. "
"Other examples: 'no', 'no2', 'nox', 'so2', 'co', 'bc'. "
"Only applicable to the web API methods ('api-v2')."
)
),
reference_grade: bool = typer.Option(True, help="Include reference-grade sensors."),
low_cost: bool = typer.Option(False, help="Include low-cost sensors."),
method: str = typer.Option("api-v2", "-m", "--method", help=(
"Method (reader) to use for fetching data. "
"Options: 'api-v2', 'openaq-fetches'."
)
),
compress: bool = typer.Option(True, help=(
"If true, pack float to int and apply compression using zlib with complevel 7. "
"This can take time if the dataset is large, but can lead to "
"significant space savings."
)
),
num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."),
verbose: bool = typer.Option(False),
debug: bool = typer.Option(
False, "--debug/", help="Print more messages (including full tracebacks)."
),
):
"""Download hourly OpenAQ data using monetio and reformat for MM usage."""
import warnings

import monetio as mio
import pandas as pd

from .util.write_util import write_ncf

global DEBUG

DEBUG = debug

typer.echo(HEADER)

if method not in {"api-v2", "openaq-fetches"}:
typer.secho(f"Error: method {method!r} not recognized", fg=ERROR_COLOR)
raise typer.Exit(2)

start_date = pd.Timestamp(start_date)
end_date = pd.Timestamp(end_date)

if method in {"openaq-fetches"}:
dates = pd.date_range(start_date, end_date, freq="D")
elif method in {"api-v2"}:
dates = pd.date_range(start_date, end_date, freq="h")
else:
raise AssertionError
if verbose:
print("Dates:")
print(dates)

if verbose and method in {"api-v2"}:
print("Params:", param)

# Set destination and file name
fmt = r"%Y%m%d"
if out_name is None:
out_name = f"OpenAQ_{start_date:{fmt}}_{end_date:{fmt}}.nc"
else:
p = Path(out_name)
if p.name == out_name:
# `out_name` is just the file name
out_name = p.name
else:
# `out_name` has path
if dst != Path("."):
typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}")
dst = p.parent
out_name = p.name

sensor_types = []
if reference_grade:
sensor_types.append("reference grade")
if low_cost:
sensor_types.append("low-cost sensor")
if not sensor_types and method in {"api-v2"}:
typer.secho(
"Error: no sensor types selected. Use --reference-grade and/or --low-cost",
fg=ERROR_COLOR,
)
raise typer.Exit(2)

if verbose and method in {"openaq-fetches"}:
from dask.diagnostics import ProgressBar

ProgressBar().register()

with _timer("Fetching data with monetio"):
if method == "openaq-fetches":
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="The (error|warn)_bad_lines argument has been deprecated"
)
df = mio.openaq.add_data(
dates,
n_procs=num_workers,
# wide_fmt=True,
)

# Address time-wise non-unique site IDs
# Some (most?) are just slightly different lat/lon
# But seems like a few are actual time-wise lat/lon duplicates
df = df.drop_duplicates(["time", "siteid"])

elif method == "api-v2":
df = mio.obs.openaq_v2.add_data(
dates,
parameters=param,
sensor_type=sensor_types,
wide_fmt=True,
timeout=60,
retry=15,
threads=num_workers if num_workers > 1 else None,
)

dupes = df[df.duplicated(["time", "siteid"], keep=False)]
if not dupes.empty:
typer.echo(
f"warning: {len(dupes)} unexpected time-siteid duplicated rows:"
)
if verbose:
typer.echo(dupes)
df = df.drop_duplicates(["time", "siteid"])
else:
raise AssertionError

# Drop times not on the hour
good = df.time == df.time.dt.floor("H")
typer.echo(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.")
df = df[good]

with _timer("Forming xarray Dataset"):
df = df.drop(columns=["index"], errors="ignore")
df = df.dropna(subset=["latitude", "longitude"])

if method == "openaq-fetches":
site_vns = [
"siteid", # based on country and lat/lon
"latitude",
"longitude",
"utcoffset",
#
"city",
"country", # 2-char codes
#
"sourceName",
"sourceType", # "government"
]
# NOTE: time_local not included since it varies in time as well
elif method == "api-v2":
site_vns = [
"siteid", # real OpenAQ location ID
"latitude",
"longitude",
"utcoffset",
#
"location",
"city",
"country",
#
"entity",
"sensor_type",
"is_mobile",
"is_analysis",
]
for vn in ["city", "is_analysis"]: # may have been dropped for being all null
if vn not in df.columns:
site_vns.remove(vn)

else:
raise AssertionError

ds_site = (
df[site_vns]
.groupby("siteid")
.first()
.to_xarray()
.swap_dims(siteid="x")
)

ds = (
df.drop(columns=[vn for vn in site_vns if vn not in ["siteid"]])
.set_index(["time", "siteid"])
.to_xarray()
.swap_dims(siteid="x")
.merge(ds_site)
.set_coords(["latitude", "longitude"])
.assign(x=range(ds_site.dims["x"]))
)

# Rename species vars and add units as attr
nice_us = {"ppm": "ppmv", "ugm3": "ug m-3", "ppb": "pbbv"}
for vn0 in [n for n in df.columns if n.endswith(("_ppm", "ppb", "_ugm3", "_umg3"))]:
i_last_underscore = vn0.rfind("_")
vn, u = vn0[:i_last_underscore], vn0[i_last_underscore + 1:]
if u == "umg3":
u = "ugm3"
nice_u = nice_us[u]
ds[vn0].attrs.update(units=nice_u)
ds = ds.rename_vars({vn0: vn})

# Fill in local time array
# (in the df, not all sites have rows for all times, so we have NaTs at this point)
ds["time_local"] = ds.time + ds.utcoffset

# Expand
ds = (
ds
.expand_dims("y")
.transpose("time", "y", "x")
)

with _timer("Writing netCDF file"):
if compress:
write_ncf(ds, dst / out_name, verbose=verbose)
else:
ds.to_netcdf(dst / out_name)

cli = app

_typer_click_object = typer.main.get_command(app) # for sphinx-click in docs
Expand Down
47 changes: 47 additions & 0 deletions melodies_monet/tests/test_get_data_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
Check for consistency with the tutorial datasets and that options work.
"""
import os
import subprocess

import numpy as np
Expand All @@ -30,6 +31,8 @@
ds0_aeronet = xr.open_dataset(fetch_example("aeronet:2019-09"))
ds0_airnow = xr.open_dataset(fetch_example("airnow:2019-09"))

have_openaq_api_key = len(os.environ.get("OPENAQ_API_KEY", "")) > 0


def test_get_aeronet_no_data_err():
cmd = [
Expand Down Expand Up @@ -201,3 +204,47 @@ def test_get_aqs_hourly(tmp_path):
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"OZONE", "time_local"}


@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set")
def test_get_openaq(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-openaq",
"-s", "2024-09-10", "-e" "2024-09-10 00:59",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 1
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"o3", "pm25", "pm10", "time_local"}
assert (ds.sensor_type == "reference grade").all()


@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set")
def test_get_openaq_lowcost(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-openaq",
"-s", "2024-09-10", "-e" "2024-09-10 00:59",
"-p", "pm25",
"--no-reference-grade", "--low-cost",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 1
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"pm25", "time_local"}
assert (ds.sensor_type == "low-cost sensor").all()

0 comments on commit ad184bf

Please sign in to comment.