diff --git a/src/pudl/ferc_to_sqlite/cli.py b/src/pudl/ferc_to_sqlite/cli.py index f6b1d72f54..9dd6251f46 100644 --- a/src/pudl/ferc_to_sqlite/cli.py +++ b/src/pudl/ferc_to_sqlite/cli.py @@ -115,6 +115,7 @@ def get_ferc_to_sqlite_job(): @click.option( "--gcs-cache-path", type=str, + default="", help=( "Load cached inputs from Google Cloud Storage if possible. This is usually " "much faster and more reliable than downloading from Zenodo directly. The " diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 8d4249063c..2a8cac4f32 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -1,5 +1,6 @@ """Dagster IO Managers.""" import json +import os import re from pathlib import Path from sqlite3 import sqlite_version @@ -17,6 +18,7 @@ OutputContext, UPathIOManager, io_manager, + ConfigurableIOManager, ) from packaging import version from sqlalchemy.exc import SQLAlchemyError @@ -375,6 +377,84 @@ def load_input(self, context: InputContext) -> pd.DataFrame: return df +# TODO(rousik): we can add env variable that will redirect read/writes between sqlite and parquet +# formats. Alternatively, we could also just do this choice in the resource instantiation and use +# a different class altogether for this. + +class PudlParquetIOManager(IOManager): + base_dir: str + + def __init__(self, base_dir: str): + """Initializes PUDL parquet IO manager. + + Creates base_dir where parquet files will be written. + """ + self.base_dir = base_dir + Path(self.base_dir).mkdir(parents=True, exist_ok=True) + + def _get_table_name(self, context) -> str: + """Get asset name from dagster context object.""" + if context.has_asset_key: + table_name = context.asset_key.to_python_identifier() + else: + table_name = context.get_identifier() + return table_name + + def handle_output(self, context: OutputContext, obj: pd.DataFrame | str): + """Handle an op or asset output. + + If the output is a dataframe, write it to the database. If it is a string + execute it as a SQL query. + + Args: + context: dagster keyword that provides access output information like asset + name. + obj: a sql query or dataframe to add to the database. + + Raises: + Exception: if an asset or op returns an unsupported datatype. + """ + if isinstance(obj, str): + raise Exception( + f"PudlParquetIOManager only supports pandas DataFrames, got: {str(obj)}" + ) + assert isinstance(obj, pd.DataFrame) + table_name = self._get_table_name(context) + parquet_path = Path(self.base_dir) / f"{table_name}.parquet" + row_count = len(obj) + context.log.info(f"Row count for table {table_name}: {row_count}") + obj.to_parquet(path=parquet_path.as_posix(), index=False) + + def load_input(self, context: InputContext) -> pd.DataFrame: + """Load a dataframe from a sqlite database. + + Args: + context: dagster keyword that provides access output information like asset + name. + """ + table_name = self._get_table_name(context) + + # Check if there is a Resource in self.package for table_name + try: + res = self.package.get_resource(table_name) + except ValueError: + raise ValueError( + f"{table_name} does not appear in pudl.metadata.resources. " + "Check for typos, or add the table to the metadata and recreate the " + f"PUDL SQlite database. It's also possible that {table_name} is one of " + "the tables that does not get loaded into the PUDL SQLite DB because " + "it's a work in progress or is distributed in Apache Parquet format." + ) + df = res.enforce_schema( + pd.read_parquet(path=(self.base_dir / f"{table_name}.parquet").as_posix()) + ) + if df.empty: + raise AssertionError( + f"The {table_name} table is empty. Materialize the {table_name} " + "asset so it is available in the database." + ) + return df + class PudlSQLiteIOManager(SQLiteIOManager): """IO Manager that writes and retrieves dataframes from a SQLite database. @@ -414,6 +494,8 @@ def __init__( exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed. + use_parquet: if True, instead of actually writing to sqlite databases, we will + use parquet format. """ if package is None: package = Package.from_resource_ids() @@ -425,7 +507,7 @@ def __init__( f"{sqlite_path} not initialized! Run `alembic upgrade head`." ) - super().__init__(base_dir, db_name, md, timeout) + super().__init__(base_dir, db_name, md, timeout, use_parquet=use_parquet) existing_schema_context = MigrationContext.configure(self.engine.connect()) metadata_diff = compare_metadata(existing_schema_context, self.md) @@ -538,6 +620,13 @@ def load_input(self, context: InputContext) -> pd.DataFrame: @io_manager def pudl_sqlite_io_manager(init_context) -> PudlSQLiteIOManager: """Create a SQLiteManager dagster resource for the pudl database.""" + # TODO(rousik): this is now controlled with env variable, but that should be turned into + # dagster configuration value. + if os.environ.get("PUDL_USE_PARQUET", "").lower() == "true": + logger.info("PudlIOManager: Using parquet") + return PudlParquetIOManager(base_dir=(PudlPaths().output_dir / "pudl").as_posix()) + + logger.info("PudlIOManager: using sqlite") return PudlSQLiteIOManager(base_dir=PudlPaths().output_dir, db_name="pudl")