Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate ETL functions #15

Merged
merged 9 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ jobs:
python-version: '3.10'
cache: 'pip'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install -r requirements-dev.txt

- name: Install package
run: |
python -m pip install .

- name: Run tests
run: |
python -m pytest
145 changes: 145 additions & 0 deletions offsets_db_data/apx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import numpy as np # noqa: F401
import pandas as pd
import pandas_flavor as pf

from offsets_db_data.common import (
CREDIT_SCHEMA_UPATH,
PROJECT_SCHEMA_UPATH,
load_column_mapping,
load_inverted_protocol_mapping,
load_protocol_mapping,
load_registry_project_column_mapping,
)
from offsets_db_data.credits import * # noqa: F403
from offsets_db_data.models import credit_without_id_schema, project_schema
from offsets_db_data.projects import * # noqa: F403


@pf.register_dataframe_method
def determine_transaction_type(df: pd.DataFrame, *, download_type: str) -> pd.DataFrame:
transaction_type_mapping = {
'issuances': 'issuance',
'retirements': 'retirement',
'cancellations': 'cancellation',
}
df['transaction_type'] = transaction_type_mapping[download_type]
return df


@pf.register_dataframe_method
def process_apx_credits(
df: pd.DataFrame, *, download_type: str, registry_name: str, arb: pd.DataFrame | None = None
) -> pd.DataFrame:
df = df.copy()

column_mapping = load_column_mapping(
registry_name=registry_name, download_type=download_type, mapping_path=CREDIT_SCHEMA_UPATH
)

columns = {v: k for k, v in column_mapping.items()}

data = (
df.set_registry(registry_name=registry_name)
.determine_transaction_type(download_type=download_type)
.rename(columns=columns)
.convert_to_datetime(columns=['transaction_date'])
)

if download_type == 'issuances':
data = data.aggregate_issuance_transactions()

data = data.validate(schema=credit_without_id_schema)
if arb is not None and not arb.empty:
data = data.merge_with_arb(arb=arb)
return data


def harmonize_acr_status(row: pd.Series) -> str:
"""Derive single project status for CAR and ACR projects

Raw CAR and ACR data has two status columns -- one for compliance status, one for voluntary.
Handle and harmonize.

Parameters
----------
row : pd.Series
A row from a pandas DataFrame

Returns
-------
value : str
The status of the project
"""
if row['Compliance Program Status (ARB or Ecology)'] == 'Not ARB or Ecology Eligible':
return row['Voluntary Status'].lower()
ACR_COMPLIANCE_STATE_MAP = {
'Listed - Active ARB Project': 'active',
'ARB Completed': 'completed',
'ARB Inactive': 'completed',
'Listed - Proposed Project': 'listed',
'Listed - Active Registry Project': 'listed',
'ARB Terminated': 'completed',
'Submitted': 'listed',
'Transferred ARB or Ecology Project': 'active',
'Listed – Active ARB Project': 'active',
}

return ACR_COMPLIANCE_STATE_MAP.get(
row['Compliance Program Status (ARB or Ecology)'], 'unknown'
)


@pf.register_dataframe_method
def add_project_url(df: pd.DataFrame, *, registry_name: str) -> pd.DataFrame:
if registry_name == 'american-carbon-registry':
base = 'https://acr2.apx.com/mymodule/reg/prjView.asp?id1='
elif registry_name == 'climate-action-reserve':
base = 'https://thereserve2.apx.com/mymodule/reg/prjView.asp?id1='
elif registry_name == 'art-trees':
base = 'https://art.apx.com/mymodule/reg/prjView.asp?id1='

else:
raise ValueError(f'Unknown registry name: {registry_name}')

df['project_url'] = base + df['project_id'].str[3:]
return df


@pf.register_dataframe_method
def process_apx_projects(
df: pd.DataFrame, *, credits: pd.DataFrame, registry_name: str
) -> pd.DataFrame:
df = df.copy()
credits = credits.copy()
registry_project_column_mapping = load_registry_project_column_mapping(
registry_name=registry_name, file_path=PROJECT_SCHEMA_UPATH
)
inverted_column_mapping = {value: key for key, value in registry_project_column_mapping.items()}
protocol_mapping = load_protocol_mapping()
inverted_protocol_mapping = load_inverted_protocol_mapping()
data = df.rename(columns=inverted_column_mapping)
if registry_name == 'art-trees':
data['protocol'] = [['art-trees']] * len(data)
data['category'] = [['forest']] * len(data)
else:
data = data.map_protocol(inverted_protocol_mapping=inverted_protocol_mapping).add_category(
protocol_mapping=protocol_mapping
)

if registry_name == 'american-carbon-registry':
data['status'] = data.apply(harmonize_acr_status, axis=1)
else:
data = data.harmonize_status_codes()

data = (
data.set_registry(registry_name=registry_name)
.add_project_url(registry_name=registry_name)
.harmonize_country_names()
.add_is_compliance_flag()
.add_retired_and_issued_totals(credits=credits)
.add_first_issuance_and_retirement_dates(credits=credits)
.add_missing_columns(columns=project_schema.columns.keys())
.convert_to_datetime(columns=['listed_at'])
.validate(schema=project_schema)
)
return data
2 changes: 2 additions & 0 deletions offsets_db_data/arb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import janitor # noqa: F401
import numpy as np
import pandas as pd
import pandas_flavor as pf


def _get_registry(item):
Expand All @@ -14,6 +15,7 @@ def _get_registry(item):
return registry_map.get(prefix)


@pf.register_dataframe_method
def process_arb_data(df: pd.DataFrame) -> pd.DataFrame:
"""

Expand Down
95 changes: 95 additions & 0 deletions offsets_db_data/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
from collections import defaultdict

import pandas as pd
import pandas_flavor as pf
import pandera as pa
import upath

CREDIT_SCHEMA_UPATH = (
upath.UPath(__file__).parents[0] / 'configs' / 'credits-raw-columns-mapping.json'
)

PROTOCOL_MAPPING_UPATH = upath.UPath(__file__).parents[0] / 'configs' / 'all-protocol-mapping.json'
PROJECT_SCHEMA_UPATH = (
upath.UPath(__file__).parents[0] / 'configs' / 'projects-raw-columns-mapping.json'
)


def load_registry_project_column_mapping(
*, registry_name: str, file_path: upath.UPath = PROJECT_SCHEMA_UPATH
) -> dict:
with open(file_path) as file:
data = json.load(file)

mapping: dict = {}
for key1, value_dict in data.items():
for key2, value in value_dict.items():
if key2 not in mapping:
mapping[key2] = {}
if value:
mapping[key2][key1] = value
return mapping[registry_name]


def load_protocol_mapping(path: upath.UPath = PROTOCOL_MAPPING_UPATH) -> dict:
return json.loads(path.read_text())


def load_inverted_protocol_mapping() -> dict:
protocol_mapping = load_protocol_mapping()
store = defaultdict(list)
for protocol_str, metadata in protocol_mapping.items():
for known_string in metadata.get('known-strings', []):
store[known_string].append(protocol_str)

return store


def load_column_mapping(*, registry_name: str, download_type: str, mapping_path: str) -> dict:
with open(mapping_path) as f:
registry_credit_column_mapping = json.load(f)
return registry_credit_column_mapping[registry_name][download_type]


@pf.register_dataframe_method
def set_registry(df: pd.DataFrame, registry_name: str) -> pd.DataFrame:
df['registry'] = registry_name
return df


@pf.register_dataframe_method
def convert_to_datetime(
df: pd.DataFrame, *, columns: list, date_format: str = 'mixed', utc: bool = True
) -> pd.DataFrame:
for column in columns:
if column in df.columns:
df[column] = pd.to_datetime(df[column], format=date_format, utc=utc)
else:
raise KeyError(f"The column '{column}' is missing.")
return df


@pf.register_dataframe_method
def add_missing_columns(df: pd.DataFrame, *, columns: list[str]) -> pd.DataFrame:
for column in columns:
if column not in df.columns:
df.loc[:, column] = None
return df


@pf.register_dataframe_method
def validate(df: pd.DataFrame, schema: pa.DataFrameSchema) -> pd.DataFrame:
results = schema.validate(df)
keys = sorted(list(schema.columns.keys()))
results = results[keys]

return results


@pf.register_dataframe_method
def clean_and_convert_numeric_columns(df: pd.DataFrame, *, columns: list[str]) -> pd.DataFrame:
for column in columns:
df[column] = df[column].str.replace(',', '', regex=True)
df[column] = pd.to_numeric(df[column], errors='coerce')
return df
Loading