Skip to content

Commit

Permalink
Experimental support for exporting to parquet files.
Browse files Browse the repository at this point in the history
  • Loading branch information
rousik committed Jan 6, 2024
1 parent 68eed0d commit 8b4fed4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/pudl/ferc_to_sqlite/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_ferc_to_sqlite_job():
@click.option(
"--gcs-cache-path",
type=str,
default="",
help=(
"Load cached inputs from Google Cloud Storage if possible. This is usually "
"much faster and more reliable than downloading from Zenodo directly. The "
Expand Down
91 changes: 90 additions & 1 deletion src/pudl/io_managers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Dagster IO Managers."""
import json
import os
import re
from pathlib import Path
from sqlite3 import sqlite_version
Expand All @@ -17,6 +18,7 @@
OutputContext,
UPathIOManager,
io_manager,
ConfigurableIOManager,
)
from packaging import version
from sqlalchemy.exc import SQLAlchemyError
Expand Down Expand Up @@ -375,6 +377,84 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
return df


# TODO(rousik): we can add env variable that will redirect read/writes between sqlite and parquet
# formats. Alternatively, we could also just do this choice in the resource instantiation and use
# a different class altogether for this.

class PudlParquetIOManager(IOManager):
base_dir: str

def __init__(self, base_dir: str):
"""Initializes PUDL parquet IO manager.
Creates base_dir where parquet files will be written.
"""
self.base_dir = base_dir
Path(self.base_dir).mkdir(parents=True, exist_ok=True)

def _get_table_name(self, context) -> str:
"""Get asset name from dagster context object."""
if context.has_asset_key:
table_name = context.asset_key.to_python_identifier()
else:
table_name = context.get_identifier()
return table_name

def handle_output(self, context: OutputContext, obj: pd.DataFrame | str):
"""Handle an op or asset output.
If the output is a dataframe, write it to the database. If it is a string
execute it as a SQL query.
Args:
context: dagster keyword that provides access output information like asset
name.
obj: a sql query or dataframe to add to the database.
Raises:
Exception: if an asset or op returns an unsupported datatype.
"""
if isinstance(obj, str):
raise Exception(
f"PudlParquetIOManager only supports pandas DataFrames, got: {str(obj)}"
)
assert isinstance(obj, pd.DataFrame)
table_name = self._get_table_name(context)
parquet_path = Path(self.base_dir) / f"{table_name}.parquet"
row_count = len(obj)
context.log.info(f"Row count for table {table_name}: {row_count}")
obj.to_parquet(path=parquet_path.as_posix(), index=False)

def load_input(self, context: InputContext) -> pd.DataFrame:
"""Load a dataframe from a sqlite database.
Args:
context: dagster keyword that provides access output information like asset
name.
"""
table_name = self._get_table_name(context)

# Check if there is a Resource in self.package for table_name
try:
res = self.package.get_resource(table_name)
except ValueError:
raise ValueError(
f"{table_name} does not appear in pudl.metadata.resources. "
"Check for typos, or add the table to the metadata and recreate the "
f"PUDL SQlite database. It's also possible that {table_name} is one of "
"the tables that does not get loaded into the PUDL SQLite DB because "
"it's a work in progress or is distributed in Apache Parquet format."
)
df = res.enforce_schema(
pd.read_parquet(path=(self.base_dir / f"{table_name}.parquet").as_posix())
)
if df.empty:
raise AssertionError(
f"The {table_name} table is empty. Materialize the {table_name} "
"asset so it is available in the database."
)
return df

class PudlSQLiteIOManager(SQLiteIOManager):
"""IO Manager that writes and retrieves dataframes from a SQLite database.
Expand Down Expand Up @@ -414,6 +494,8 @@ def __init__(
exception, if the database is locked by another connection. If another
connection opens a transaction to modify the database, it will be locked
until that transaction is committed.
use_parquet: if True, instead of actually writing to sqlite databases, we will
use parquet format.
"""
if package is None:
package = Package.from_resource_ids()
Expand All @@ -425,7 +507,7 @@ def __init__(
f"{sqlite_path} not initialized! Run `alembic upgrade head`."
)

super().__init__(base_dir, db_name, md, timeout)
super().__init__(base_dir, db_name, md, timeout, use_parquet=use_parquet)

existing_schema_context = MigrationContext.configure(self.engine.connect())
metadata_diff = compare_metadata(existing_schema_context, self.md)
Expand Down Expand Up @@ -538,6 +620,13 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
@io_manager
def pudl_sqlite_io_manager(init_context) -> PudlSQLiteIOManager:
"""Create a SQLiteManager dagster resource for the pudl database."""
# TODO(rousik): this is now controlled with env variable, but that should be turned into
# dagster configuration value.
if os.environ.get("PUDL_USE_PARQUET", "").lower() == "true":
logger.info("PudlIOManager: Using parquet")
return PudlParquetIOManager(base_dir=(PudlPaths().output_dir / "pudl").as_posix())

logger.info("PudlIOManager: using sqlite")
return PudlSQLiteIOManager(base_dir=PudlPaths().output_dir, db_name="pudl")


Expand Down

0 comments on commit 8b4fed4

Please sign in to comment.