diff --git a/migrations/versions/2e5b623ab40b_create_intermediate_steam_plants_table_.py b/migrations/versions/2e5b623ab40b_create_intermediate_steam_plants_table_.py new file mode 100644 index 0000000000..4cfc3a559a --- /dev/null +++ b/migrations/versions/2e5b623ab40b_create_intermediate_steam_plants_table_.py @@ -0,0 +1,31 @@ +"""create intermediate steam plants table with plant ids + +Revision ID: 2e5b623ab40b +Revises: 4b08158ae952 +Create Date: 2023-12-19 17:37:33.476337 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '2e5b623ab40b' +down_revision = '4b08158ae952' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('core_ferc1__yearly_steam_plants_sched402', schema=None) as batch_op: + batch_op.drop_column('plant_id_ferc1') + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('core_ferc1__yearly_steam_plants_sched402', schema=None) as batch_op: + batch_op.add_column(sa.Column('plant_id_ferc1', sa.INTEGER(), nullable=True)) + + # ### end Alembic commands ### diff --git a/src/pudl/analysis/__init__.py b/src/pudl/analysis/__init__.py index 44b2019e7a..10faa4bcf4 100644 --- a/src/pudl/analysis/__init__.py +++ b/src/pudl/analysis/__init__.py @@ -8,9 +8,12 @@ from . import ( allocate_gen_fuel, eia_ferc1_record_linkage, + eia_ferc1_train, epacamd_eia, + fuel_by_plant, mcoe, plant_parts_eia, + record_linkage, service_territory, spatial, state_demand, diff --git a/src/pudl/analysis/classify_plants_ferc1.py b/src/pudl/analysis/classify_plants_ferc1.py deleted file mode 100644 index 227f926747..0000000000 --- a/src/pudl/analysis/classify_plants_ferc1.py +++ /dev/null @@ -1,762 +0,0 @@ -"""Scikit-Learn classification pipeline for identifying related FERC 1 plant records. - -Sadly FERC doesn't provide any kind of real IDs for the plants that report to them -- -all we have is their names (a freeform string) and the data that is reported alongside -them. This is often enough information to be able to recognize which records ought to be -associated with each other year to year to create a continuous time series. However, we -want to do that programmatically, which means using some clustering / categorization -tools from scikit-learn -""" - -import re -from difflib import SequenceMatcher - -import networkx as nx # Used to knit incomplete ferc plant time series together. -import numpy as np -import pandas as pd -from sklearn.base import BaseEstimator, ClassifierMixin -from sklearn.compose import ColumnTransformer -from sklearn.feature_extraction.text import TfidfVectorizer - -# These modules are required for the FERC Form 1 Plant ID & Time Series -from sklearn.metrics.pairwise import cosine_similarity -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import MinMaxScaler, Normalizer, OneHotEncoder - -import pudl - -logger = pudl.logging_helpers.get_logger(__name__) - - -class FERCPlantClassifier(BaseEstimator, ClassifierMixin): - """A classifier for identifying FERC plant time series in FERC Form 1 data. - - We want to be able to give the classifier a FERC plant record, and get back the - group of records(or the ID of the group of records) that it ought to be part of. - - There are hundreds of different groups of records, and we can only know what they - are by looking at the whole dataset ahead of time. This is the "fitting" step, in - which the groups of records resulting from a particular set of model parameters(e.g. - the weights that are attributes of the class) are generated. - - Once we have that set of record categories, we can test how well the classifier - performs, by checking it against test / training data which we have already - classified by hand. The test / training set is a list of lists of unique FERC plant - record IDs(each record ID is the concatenation of: report year, respondent id, - supplement number, and row number). It could also be stored as a dataframe where - each column is associated with a year of data(some of which could be empty). Not - sure what the best structure would be. - - If it's useful, we can assign each group a unique ID that is the time ordered - concatenation of each of the constituent record IDs. Need to understand what the - process for checking the classification of an input record looks like. - - To score a given classifier, we can look at what proportion of the records in the - test dataset are assigned to the same group as in our manual classification of those - records. There are much more complicated ways to do the scoring too... but for now - let's just keep it as simple as possible. - """ - - def __init__(self, plants_df: pd.DataFrame, min_sim: float = 0.75) -> None: - """Initialize the classifier. - - Args: - plants_df: The entire FERC Form 1 plant table as a dataframe. Needed in - order to calculate the distance metrics between all of the records so we - can group the plants in the fit() step, so we can check how well they - are categorized later... - min_sim: Number between 0.0 and 1.0, indicating the minimum value of - cosine similarity that we are willing to accept as indicating two - records are part of the same plant record time series. All entries in - the pairwise similarity matrix below this value will be zeroed out. - """ - self.min_sim = min_sim - self.plants_df = plants_df - self._years = self.plants_df.report_year.unique() # could we list() here? - - def fit(self, X, y=None) -> "FERCPlantClassifier": # noqa: N803 - """Use weighted FERC plant features to group records into time series. - - The fit method takes the vectorized, normalized, weighted FERC plant - features (X) as input, calculates the pairwise cosine similarity matrix - between all records, and groups the records in their best time series. - The similarity matrix and best time series are stored as data members - in the object for later use in scoring & predicting. - - This isn't quite the way a fit method would normally work. - - Args: - X: a sparse matrix of size n_samples x n_features. - y: Included only for API compatibility. - """ - self._cossim_df = pd.DataFrame(cosine_similarity(X)) - self._best_of = self._best_by_year() - # Make the best match indices integers rather than floats w/ NA values. - self._best_of[list(self._years)] = ( - self._best_of[list(self._years)].fillna(-1).astype(int) - ) - - return self - - def transform(self, X, y=None): # noqa: N803 - """Passthrough transform method -- just returns self.""" - return self - - def predict(self, X, y=None): # noqa: N803 - """Identify time series of similar records to input record_ids. - - Given a one-dimensional dataframe X, containing FERC record IDs, return - a dataframe in which each row corresponds to one of the input record_id - values (ordered as the input was ordered), with each column - corresponding to one of the years worth of data. Values in the returned - dataframe are the FERC record_ids of the record most similar to the - input record within that year. Some of them may be null, if there was - no sufficiently good match. - - Row index is the seed record IDs. Column index is years. - - Todo: - * This method is hideously inefficient. It should be vectorized. - * There's a line that throws a FutureWarning that needs to be fixed. - """ - try: - getattr(self, "_cossim_df") - except AttributeError: - raise RuntimeError("You must train classifer before predicting data!") - - tmp_best = pd.concat( - [ - self._best_of.loc[:, ["record_id"] + list(self._years)], - pd.DataFrame(data=[""], index=[-1], columns=["record_id"]), - ] - ) - out_dfs = [] - # For each record_id we've been given: - for x in X: - # Find the index associated with the record ID we are predicting - # a grouping for: - idx = tmp_best[tmp_best.record_id == x].index.to_numpy()[0] - - # Mask the best_of dataframe, keeping only those entries where - # the index of the chosen record_id appears -- this results in a - # huge dataframe almost full of NaN values. - w_m = ( - tmp_best[self._years][tmp_best[self._years] == idx] - # Grab the index values of the rows in the masked dataframe which - # are NOT all NaN -- these are the indices of the *other* records - # which found the record x to be one of their best matches. - .dropna(how="all") - .index.to_numpy() - ) - - # Now look up the indices of the records which were found to be - # best matches to the record x. - b_m = tmp_best.loc[idx, self._years].astype(int) - - # Here we require that there is no conflict between the two sets - # of indices -- that every time a record shows up in a grouping, - # that grouping is either the same, or a subset of the other - # groupings that it appears in. When no sufficiently good match - # is found the "index" in the _best_of array is set to -1, so - # requiring that the b_m value be >=0 screens out those no-match - # cases. This is okay -- we're just trying to require that the - # groupings be internally self-consistent, not that they are - # completely identical. Being flexible on this dramatically - # increases the number of records that get assigned a plant ID. - if np.array_equiv(w_m, b_m[b_m >= 0].to_numpy()): - # This line is causing a warning. In cases where there are - # some years no sufficiently good match exists, and so b_m - # doesn't contain an index. Instead, it has a -1 sentinel - # value, which isn't a label for which a record exists, which - # upsets .loc. Need to find some way around this... but for - # now it does what we want. We could use .iloc instead, but - # then the -1 sentinel value maps to the last entry in the - # dataframe, which also isn't what we want. Blargh. - new_grp = tmp_best.loc[b_m, "record_id"] - - # Stack the new list of record_ids on our output DataFrame: - out_dfs.append( - pd.DataFrame( - data=new_grp.to_numpy().reshape(1, len(self._years)), - index=pd.Index( - [tmp_best.loc[idx, "record_id"]], name="seed_id" - ), - columns=self._years, - ) - ) - return pd.concat(out_dfs) - - def score(self, X, y=None): # noqa: N803 - """Scores a collection of FERC plant categorizations. - - For every record ID in X, predict its record group and calculate - a metric of similarity between the prediction and the "ground - truth" group that was passed in for that value of X. - - Args: - X (pandas.DataFrame): an n_samples x 1 pandas dataframe of FERC - Form 1 record IDs. - y (pandas.DataFrame): a dataframe of "ground truth" FERC Form 1 - record groups, corresponding to the list record IDs in X - - Returns: - numpy.ndarray: The average of all the similarity metrics as the - score. - """ - scores = [] - for true_group in y: - true_group = str.split(true_group, sep=",") - true_group = [s for s in true_group if s != ""] - predicted_groups = self.predict(pd.DataFrame(true_group)) - for rec_id in true_group: - sm = SequenceMatcher(None, true_group, predicted_groups.loc[rec_id]) - scores = scores + [sm.ratio()] - - return np.mean(scores) - - def _best_by_year(self): - """Finds the best match for each plant record in each other year.""" - # only keep similarity matrix entries above our minimum threshold: - out_df = self.plants_df.copy() - sim_df = self._cossim_df[self._cossim_df >= self.min_sim] - - # Add a column for each of the years, in which we will store indices - # of the records which best match the record in question: - for yr in self._years: - newcol = yr - out_df[newcol] = -1 - - # seed_yr is the year we are matching *from* -- we do the entire - # matching process from each year, since it may not be symmetric: - for seed_yr in self._years: - seed_idx = self.plants_df.index[self.plants_df.report_year == seed_yr] - # match_yr is all the other years, in which we are finding the best - # match - for match_yr in self._years: - best_of_yr = match_yr - match_idx = self.plants_df.index[self.plants_df.report_year == match_yr] - # For each record specified by seed_idx, obtain the index of - # the record within match_idx that that is the most similar. - best_idx = sim_df.iloc[seed_idx, match_idx].idxmax(axis=1) - out_df.iloc[seed_idx, out_df.columns.get_loc(best_of_yr)] = best_idx - - return out_df - - -def make_ferc1_clf( - plants_df, - ngram_min=2, - ngram_max=10, - min_sim=0.75, - plant_name_ferc1_wt=2.0, - plant_type_wt=2.0, - construction_type_wt=1.0, - capacity_mw_wt=1.0, - construction_year_wt=1.0, - utility_id_ferc1_wt=1.0, - fuel_fraction_wt=1.0, -): - """Create a FERC Plant Classifier using several weighted features. - - Given a FERC steam plants dataframe plants_df, which also includes fuel consumption - information, transform a selection of useful columns into features suitable for use - in calculating inter-record cosine similarities. Individual features are weighted - according to the keyword arguments. - - Features include: - - * plant_name (via TF-IDF, with ngram_min and ngram_max as parameters) - * plant_type (OneHot encoded categorical feature) - * construction_type (OneHot encoded categorical feature) - * capacity_mw (MinMax scaled numerical feature) - * construction year (OneHot encoded categorical feature) - * utility_id_ferc1 (OneHot encoded categorical feature) - * fuel_fraction_mmbtu (several MinMax scaled numerical columns, which are - normalized and treated as a single feature.) - - This feature matrix is then used to instantiate a FERCPlantClassifier. - - The combination of the ColumnTransformer and FERCPlantClassifier are combined in a - sklearn Pipeline, which is returned by the function. - - Arguments: - ngram_min (int): the minimum lengths to consider in the vectorization of the - plant_name feature. - ngram_max (int): the maximum n-gram lengths to consider in the vectorization of - the plant_name feature. - min_sim (float): the minimum cosine similarity between two records that can be - considered a "match" (a number between 0.0 and 1.0). - plant_name_ferc1_wt (float): weight used to determine the relative importance - of each of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - plant_type_wt (float): weight used to determine the relative importance of each - of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - construction_type_wt (float): weight used to determine the relative importance - of each of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - capacity_mw_wt (float):weight used to determine the relative importance of each - of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - construction_year_wt (float): weight used to determine the relative importance - of each of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - utility_id_ferc1_wt (float): weight used to determine the relative importance - of each of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - fuel_fraction_wt (float): weight used to determine the relative importance of - each of the features in the feature matrix used to calculate the cosine - similarity between records. Used to scale each individual feature before the - vectors are normalized. - - Returns: - sklearn.pipeline.Pipeline: an sklearn Pipeline that performs reprocessing and - classification with a FERCPlantClassifier object. - """ - # Make a list of all the fuel fraction columns for use as one feature. - fuel_cols = list(plants_df.filter(regex=".*_fraction_mmbtu$").columns) - - ferc1_pipe = Pipeline( - [ - ( - "preprocessor", - ColumnTransformer( - transformers=[ - ( - "plant_name_ferc1", - TfidfVectorizer( - analyzer="char", ngram_range=(ngram_min, ngram_max) - ), - "plant_name_ferc1", - ), - ( - "plant_type", - OneHotEncoder(categories="auto"), - ["plant_type"], - ), - ( - "construction_type", - OneHotEncoder(categories="auto"), - ["construction_type"], - ), - ("capacity_mw", MinMaxScaler(), ["capacity_mw"]), - ( - "construction_year", - OneHotEncoder(categories="auto"), - ["construction_year"], - ), - ( - "utility_id_ferc1", - OneHotEncoder(categories="auto"), - ["utility_id_ferc1"], - ), - ( - "fuel_fraction_mmbtu", - Pipeline( - [("scaler", MinMaxScaler()), ("norm", Normalizer())] - ), - fuel_cols, - ), - ], - transformer_weights={ - "plant_name_ferc1": plant_name_ferc1_wt, - "plant_type": plant_type_wt, - "construction_type": construction_type_wt, - "capacity_mw": capacity_mw_wt, - "construction_year": construction_year_wt, - "utility_id_ferc1": utility_id_ferc1_wt, - "fuel_fraction_mmbtu": fuel_fraction_wt, - }, - ), - ), - ( - "classifier", - FERCPlantClassifier(min_sim=min_sim, plants_df=plants_df), - ), - ] - ) - return ferc1_pipe - - -def plants_steam_assign_plant_ids( - ferc1_steam_df: pd.DataFrame, - ferc1_fuel_df: pd.DataFrame, - fuel_categories: list[str], -) -> pd.DataFrame: - """Assign IDs to the large steam plants.""" - ########################################################################### - # FERC PLANT ID ASSIGNMENT - ########################################################################### - # Now we need to assign IDs to the large steam plants, since FERC doesn't - # do this for us. - logger.info("Identifying distinct large FERC plants for ID assignment.") - - # scikit-learn still doesn't deal well with NA values (this will be fixed - # eventually) We need to massage the type and missing data for the - # Classifier to work. - ferc1_steam_df = pudl.helpers.fix_int_na( - ferc1_steam_df, columns=["construction_year"] - ) - - # Grab fuel consumption proportions for use in assigning plant IDs: - fuel_fractions = fuel_by_plant_ferc1(ferc1_fuel_df, fuel_categories) - ffc = list(fuel_fractions.filter(regex=".*_fraction_mmbtu$").columns) - - ferc1_steam_df = ferc1_steam_df.merge( - fuel_fractions[["utility_id_ferc1", "plant_name_ferc1", "report_year"] + ffc], - on=["utility_id_ferc1", "plant_name_ferc1", "report_year"], - how="left", - ) - # We need to fill the null values for these numerical feature vectors with - # zeros. not ideal, but the model requires dealing with nulls - null_to_zero = ffc + ["capacity_mw"] - ferc1_steam_df[null_to_zero] = ferc1_steam_df[null_to_zero].fillna(value=0.0) - # fillin these two str columns with empty strings for the model - str_cols = ["plant_type", "construction_type"] - ferc1_steam_df[str_cols] = ferc1_steam_df[str_cols].fillna(value="") - # Train the classifier using DEFAULT weights, parameters not listed here. - ferc1_clf = make_ferc1_clf(ferc1_steam_df).fit_transform(ferc1_steam_df) - - # Use the classifier to generate groupings of similar records: - record_groups = ferc1_clf.predict(ferc1_steam_df.record_id) - n_tot = len(ferc1_steam_df) - n_grp = len(record_groups) - pct_grp = n_grp / n_tot - logger.info( - f"Successfully associated {n_grp} of {n_tot} ({pct_grp:.2%}) " - f"FERC Form 1 plant records with multi-year plant entities." - ) - - record_groups.columns = record_groups.columns.astype(str) - cols = record_groups.columns - record_groups = record_groups.reset_index() - - # Now we are going to create a graph (network) that describes all of the - # binary relationships between a seed_id and the record_ids that it has - # been associated with in any other year. Each connected component of that - # graph is a ferc plant time series / plant_id - logger.info("Assigning IDs to multi-year FERC plant entities.") - edges_df = pd.DataFrame(columns=["source", "target"]) - for col in cols: - new_edges = record_groups[["seed_id", col]] - new_edges = new_edges.rename({"seed_id": "source", col: "target"}, axis=1) - edges_df = pd.concat([edges_df, new_edges], sort=True) - - # Drop any records where there's no target ID (no match in a year) - edges_df = edges_df[edges_df.target != ""] - - # We still have to deal with the orphaned records -- any record which - # wasn't place in a time series but is still valid should be included as - # its own independent "plant" for completeness, and use in aggregate - # analysis. - orphan_record_ids = np.setdiff1d( - ferc1_steam_df.record_id.unique(), record_groups.to_numpy().flatten() - ) - logger.info( - f"Identified {len(orphan_record_ids)} orphaned FERC plant records. " - f"Adding orphans to list of plant entities." - ) - orphan_df = pd.DataFrame({"source": orphan_record_ids, "target": orphan_record_ids}) - edges_df = pd.concat([edges_df, orphan_df], sort=True) - - # Use the data frame we've compiled to create a graph - G = nx.from_pandas_edgelist( # noqa: N806 - edges_df, source="source", target="target" - ) - # Find the connected components of the graph - ferc1_plants = (G.subgraph(c) for c in nx.connected_components(G)) - - # Now we'll iterate through the connected components and assign each of - # them a FERC Plant ID, and pull the results back out into a dataframe: - plants_w_ids = [] - for plant_id_ferc1, plant in enumerate(ferc1_plants): - nx.set_edge_attributes(plant, plant_id_ferc1 + 1, name="plant_id_ferc1") - new_plant_df = nx.to_pandas_edgelist(plant) - plants_w_ids.append(new_plant_df) - plants_w_ids = pd.concat(plants_w_ids) - logger.info( - f"Successfully Identified {plant_id_ferc1+1-len(orphan_record_ids)} " - f"multi-year plant entities." - ) - - # Set the construction year back to numeric because it is. - ferc1_steam_df["construction_year"] = pd.to_numeric( - ferc1_steam_df["construction_year"], errors="coerce" - ) - # We don't actually want to save the fuel fractions in this table... they - # were only here to help us match up the plants. - ferc1_steam_df = ferc1_steam_df.drop(ffc, axis=1) - - # Now we need a list of all the record IDs, with their associated - # FERC 1 plant IDs. However, the source-target listing isn't - # guaranteed to list every one of the nodes in either list, so we - # need to compile them together to ensure that we get every single - sources = ( - plants_w_ids.drop("target", axis=1) - .drop_duplicates() - .rename({"source": "record_id"}, axis=1) - ) - targets = ( - plants_w_ids.drop("source", axis=1) - .drop_duplicates() - .rename({"target": "record_id"}, axis=1) - ) - plants_w_ids = ( - pd.concat([sources, targets]) - .drop_duplicates() - .sort_values(["plant_id_ferc1", "record_id"]) - ) - steam_rids = ferc1_steam_df.record_id.to_numpy() - pwids_rids = plants_w_ids.record_id.to_numpy() - missing_ids = [rid for rid in steam_rids if rid not in pwids_rids] - if missing_ids: - raise AssertionError( - f"Uh oh, we lost {abs(len(steam_rids)-len(pwids_rids))} FERC " - f"steam plant record IDs: {missing_ids}" - ) - - ferc1_steam_df = pd.merge(ferc1_steam_df, plants_w_ids, on="record_id") - ferc1_steam_df = revert_filled_in_string_nulls(ferc1_steam_df) - return ferc1_steam_df - - -def revert_filled_in_string_nulls(df: pd.DataFrame) -> pd.DataFrame: - """Revert the filled nulls from string columns. - - Many columns that are used for the classification in - :func:`plants_steam_assign_plant_ids` have many nulls. The classifier can't handle - nulls well, so we filled in nulls with empty strings for string columns. This - function replaces empty strings with null values for specific columns that are known - to contain empty strings introduced for the classifier. - """ - for col in [ - "plant_type", - "construction_type", - "fuel_type_code_pudl", - "primary_fuel_by_cost", - "primary_fuel_by_mmbtu", - ]: - if col in df.columns: - # the replace to_replace={column_name: {"", pd.NA}} mysteriously doesn't work. - df[col] = df[col].replace( - to_replace=[""], - value=pd.NA, - ) - return df - - -def revert_filled_in_float_nulls(df: pd.DataFrame) -> pd.DataFrame: - """Revert the filled nulls from float columns. - - Many columns that are used for the classification in - :func:`plants_steam_assign_plant_ids` have many nulls. The classifier can't handle - nulls well, so we filled in nulls with zeros for float columns. This function - replaces zeros with nulls for all float columns. - """ - float_cols = list(df.select_dtypes(include=[float])) - if float_cols: - df.loc[:, float_cols] = df.loc[:, float_cols].replace(0, np.nan) - return df - - -def plants_steam_validate_ids(ferc1_steam_df: pd.DataFrame) -> pd.DataFrame: - """Tests that plant_id_ferc1 times series includes one record per year. - - Args: - ferc1_steam_df: A DataFrame of the data from the FERC 1 Steam table. - - Returns: - The input dataframe, to enable method chaining. - """ - ########################################################################## - # FERC PLANT ID ERROR CHECKING STUFF - ########################################################################## - - # Test to make sure that we don't have any plant_id_ferc1 time series - # which include more than one record from a given year. Warn the user - # if we find such cases (which... we do, as of writing) - year_dupes = ( - ferc1_steam_df.groupby(["plant_id_ferc1", "report_year"])["utility_id_ferc1"] - .count() - .reset_index() - .rename(columns={"utility_id_ferc1": "year_dupes"}) - .query("year_dupes>1") - ) - if len(year_dupes) > 0: - for dupe in year_dupes.itertuples(): - logger.error( - f"Found report_year={dupe.report_year} " - f"{dupe.year_dupes} times in " - f"plant_id_ferc1={dupe.plant_id_ferc1}" - ) - else: - logger.info("No duplicate years found in any plant_id_ferc1. Hooray!") - - return ferc1_steam_df - - -def fuel_by_plant_ferc1( - fuel_df: pd.DataFrame, fuel_categories: list[str], thresh: float = 0.5 -) -> pd.DataFrame: - """Calculates useful FERC Form 1 fuel metrics on a per plant-year basis. - - Each record in the FERC Form 1 corresponds to a particular type of fuel. Many plants - -- especially coal plants -- use more than one fuel, with gas and/or diesel serving - as startup fuels. In order to be able to classify the type of plant based on - relative proportions of fuel consumed or fuel costs it is useful to aggregate these - per-fuel records into a single record for each plant. - - Fuel cost (in nominal dollars) and fuel heat content (in mmBTU) are calculated for - each fuel based on the cost and heat content per unit, and the number of units - consumed, and then summed by fuel type (there can be more than one record for a - given type of fuel in each plant because we are simplifying the fuel categories). - The per-fuel records are then pivoted to create one column per fuel type. The total - is summed and stored separately, and the individual fuel costs & heat contents are - divided by that total, to yield fuel proportions. Based on those proportions and a - minimum threshold that's passed in, a "primary" fuel type is then assigned to the - plant-year record and given a string label. - - Args: - fuel_df: Pandas DataFrame resembling the post-transform - result for the core_ferc1__yearly_steam_plants_fuel_sched402 table. - thresh: A value between 0.5 and 1.0 indicating the minimum fraction of - overall heat content that must have been provided by a fuel in a plant-year - for it to be considered the "primary" fuel for the plant in that year. - Default value: 0.5. - - Returns: - DataFrame with a single record for each plant-year, including the columns - required to merge it with the :ref:`core_ferc1__yearly_steam_plants_sched402` - table/DataFrame (report_year, utility_id_ferc1, and plant_name) as well as - totals for fuel mmbtu consumed in that plant-year, and the cost of fuel in that - year, the proportions of heat content and fuel costs for each fuel in that year, - and a column that labels the plant's primary fuel for that year. - - Raises: - AssertionError: If the DataFrame input does not have the columns required to - run the function. - """ - keep_cols = [ - "report_year", # key - "utility_id_ferc1", # key - "plant_name_ferc1", # key - "fuel_type_code_pudl", # pivot - "fuel_consumed_units", # value - "fuel_mmbtu_per_unit", # value - "fuel_cost_per_unit_burned", # value - ] - - # Ensure that the dataframe we've gotten has all the information we need: - missing_cols = [col for col in keep_cols if col not in fuel_df.columns] - if missing_cols: - raise AssertionError( - f"Required columns not found in input fuel_df: {missing_cols}" - ) - - # Calculate per-fuel derived values and add them to the DataFrame - df = ( - # Really there should *not* be any duplicates here but... there's a - # bug somewhere that introduces them into the - # core_ferc1__yearly_steam_plants_fuel_sched402 table. - fuel_df[keep_cols] - .drop_duplicates() - # Calculate totals for each record based on per-unit values: - .assign(fuel_mmbtu=lambda x: x.fuel_consumed_units * x.fuel_mmbtu_per_unit) - .assign(fuel_cost=lambda x: x.fuel_consumed_units * x.fuel_cost_per_unit_burned) - # Drop the ratios and heterogeneous fuel "units" - .drop( - ["fuel_mmbtu_per_unit", "fuel_cost_per_unit_burned", "fuel_consumed_units"], - axis=1, - ) - # Group by the keys and fuel type, and sum: - .groupby( - [ - "utility_id_ferc1", - "plant_name_ferc1", - "report_year", - "fuel_type_code_pudl", - ], - observed=True, - ) - .sum() - .reset_index() - # Set the index to the keys, and pivot to get per-fuel columns: - .set_index(["utility_id_ferc1", "plant_name_ferc1", "report_year"]) - .pivot(columns="fuel_type_code_pudl") - .fillna(0.0) - ) - - # Undo pivot. Could refactor this old function - plant_year_totals = df.stack("fuel_type_code_pudl").groupby(level=[0, 1, 2]).sum() - - # Calculate total heat content burned for each plant, and divide it out - mmbtu_group = ( - pd.merge( - # Sum up all the fuel heat content, and divide the individual fuel - # heat contents by it (they are all contained in single higher - # level group of columns labeled fuel_mmbtu) - df.loc[:, "fuel_mmbtu"].div( - df.loc[:, "fuel_mmbtu"].sum(axis=1), axis="rows" - ), - # Merge that same total into the dataframe separately as well. - plant_year_totals.loc[:, "fuel_mmbtu"], - right_index=True, - left_index=True, - ) - .rename(columns=lambda x: re.sub(r"$", "_fraction_mmbtu", x)) - .rename(columns=lambda x: re.sub(r"_mmbtu_fraction_mmbtu$", "_mmbtu", x)) - ) - - # Calculate total fuel cost for each plant, and divide it out - cost_group = ( - pd.merge( - # Sum up all the fuel costs, and divide the individual fuel - # costs by it (they are all contained in single higher - # level group of columns labeled fuel_cost) - df.loc[:, "fuel_cost"].div(df.loc[:, "fuel_cost"].sum(axis=1), axis="rows"), - # Merge that same total into the dataframe separately as well. - plant_year_totals.loc[:, "fuel_cost"], - right_index=True, - left_index=True, - ) - .rename(columns=lambda x: re.sub(r"$", "_fraction_cost", x)) - .rename(columns=lambda x: re.sub(r"_cost_fraction_cost$", "_cost", x)) - ) - - # Re-unify the cost and heat content information: - df = pd.merge( - mmbtu_group, cost_group, left_index=True, right_index=True - ).reset_index() - - # Label each plant-year record by primary fuel: - df.loc[:, ["primary_fuel_by_cost", "primary_fuel_by_mmbtu"]] = pd.NA - df = df.astype( - { - "primary_fuel_by_cost": pd.StringDtype(), - "primary_fuel_by_mmbtu": pd.StringDtype(), - } - ) - for fuel_str in fuel_categories: - try: - mmbtu_mask = df[f"{fuel_str}_fraction_mmbtu"] > thresh - df.loc[mmbtu_mask, "primary_fuel_by_mmbtu"] = fuel_str - except KeyError: - pass - - try: - cost_mask = df[f"{fuel_str}_fraction_cost"] > thresh - df.loc[cost_mask, "primary_fuel_by_cost"] = fuel_str - except KeyError: - pass - - df[["primary_fuel_by_cost", "primary_fuel_by_mmbtu"]] = df[ - ["primary_fuel_by_cost", "primary_fuel_by_mmbtu"] - ].fillna("") - - return df diff --git a/src/pudl/analysis/fuel_by_plant.py b/src/pudl/analysis/fuel_by_plant.py new file mode 100644 index 0000000000..d781a06ed3 --- /dev/null +++ b/src/pudl/analysis/fuel_by_plant.py @@ -0,0 +1,192 @@ +"""Calculates useful FERC Form 1 fuel metrics on a per plant-year basis.""" + +import re + +import numpy as np +import pandas as pd + + +def revert_filled_in_string_nulls(df: pd.DataFrame) -> pd.DataFrame: + """Revert the filled nulls from string columns.""" + for col in [ + "plant_type", + "construction_type", + "fuel_type_code_pudl", + "primary_fuel_by_cost", + "primary_fuel_by_mmbtu", + ]: + if col in df.columns: + # the replace to_replace={column_name: {"", pd.NA}} mysteriously doesn't work. + df[col] = df[col].replace( + to_replace=[""], + value=pd.NA, + ) + return df + + +def revert_filled_in_float_nulls(df: pd.DataFrame) -> pd.DataFrame: + """Revert the filled nulls from float columns.""" + float_cols = list(df.select_dtypes(include=[float])) + if float_cols: + df.loc[:, float_cols] = df.loc[:, float_cols].replace(0, np.nan) + return df + + +def fuel_by_plant_ferc1( + fuel_df: pd.DataFrame, fuel_categories: list[str], thresh: float = 0.5 +) -> pd.DataFrame: + """Calculates useful FERC Form 1 fuel metrics on a per plant-year basis. + + Each record in the FERC Form 1 corresponds to a particular type of fuel. Many plants + -- especially coal plants -- use more than one fuel, with gas and/or diesel serving + as startup fuels. In order to be able to classify the type of plant based on + relative proportions of fuel consumed or fuel costs it is useful to aggregate these + per-fuel records into a single record for each plant. + + Fuel cost (in nominal dollars) and fuel heat content (in mmBTU) are calculated for + each fuel based on the cost and heat content per unit, and the number of units + consumed, and then summed by fuel type (there can be more than one record for a + given type of fuel in each plant because we are simplifying the fuel categories). + The per-fuel records are then pivoted to create one column per fuel type. The total + is summed and stored separately, and the individual fuel costs & heat contents are + divided by that total, to yield fuel proportions. Based on those proportions and a + minimum threshold that's passed in, a "primary" fuel type is then assigned to the + plant-year record and given a string label. + + Args: + fuel_df: Pandas DataFrame resembling the post-transform + result for the fuel_ferc1 table. + thresh: A value between 0.5 and 1.0 indicating the minimum fraction of + overall heat content that must have been provided by a fuel in a plant-year + for it to be considered the "primary" fuel for the plant in that year. + Default value: 0.5. + + Returns: + DataFrame with a single record for each plant-year, including the columns + required to merge it with the plants_steam_ferc1 table/DataFrame (report_year, + utility_id_ferc1, and plant_name) as well as totals for fuel mmbtu consumed in + that plant-year, and the cost of fuel in that year, the proportions of heat + content and fuel costs for each fuel in that year, and a column that labels the + plant's primary fuel for that year. + + Raises: + AssertionError: If the DataFrame input does not have the columns required to + run the function. + """ + keep_cols = [ + "report_year", # key + "utility_id_ferc1", # key + "plant_name_ferc1", # key + "fuel_type_code_pudl", # pivot + "fuel_consumed_units", # value + "fuel_mmbtu_per_unit", # value + "fuel_cost_per_unit_burned", # value + ] + + # Ensure that the dataframe we've gotten has all the information we need: + missing_cols = [col for col in keep_cols if col not in fuel_df.columns] + if missing_cols: + raise AssertionError( + f"Required columns not found in input fuel_df: {missing_cols}" + ) + + # Calculate per-fuel derived values and add them to the DataFrame + df = ( + # Really there should *not* be any duplicates here but... there's a + # bug somewhere that introduces them into the fuel_ferc1 table. + fuel_df[keep_cols] + .drop_duplicates() + # Calculate totals for each record based on per-unit values: + .assign(fuel_mmbtu=lambda x: x.fuel_consumed_units * x.fuel_mmbtu_per_unit) + .assign(fuel_cost=lambda x: x.fuel_consumed_units * x.fuel_cost_per_unit_burned) + # Drop the ratios and heterogeneous fuel "units" + .drop( + ["fuel_mmbtu_per_unit", "fuel_cost_per_unit_burned", "fuel_consumed_units"], + axis=1, + ) + # Group by the keys and fuel type, and sum: + .groupby( + [ + "utility_id_ferc1", + "plant_name_ferc1", + "report_year", + "fuel_type_code_pudl", + ], + observed=True, + ) + .sum() + .reset_index() + # Set the index to the keys, and pivot to get per-fuel columns: + .set_index(["utility_id_ferc1", "plant_name_ferc1", "report_year"]) + .pivot(columns="fuel_type_code_pudl") + .fillna(0.0) + ) + + # Undo pivot. Could refactor this old function + plant_year_totals = df.stack("fuel_type_code_pudl").groupby(level=[0, 1, 2]).sum() + + # Calculate total heat content burned for each plant, and divide it out + mmbtu_group = ( + pd.merge( + # Sum up all the fuel heat content, and divide the individual fuel + # heat contents by it (they are all contained in single higher + # level group of columns labeled fuel_mmbtu) + df.loc[:, "fuel_mmbtu"].div( + df.loc[:, "fuel_mmbtu"].sum(axis=1), axis="rows" + ), + # Merge that same total into the dataframe separately as well. + plant_year_totals.loc[:, "fuel_mmbtu"], + right_index=True, + left_index=True, + ) + .rename(columns=lambda x: re.sub(r"$", "_fraction_mmbtu", x)) + .rename(columns=lambda x: re.sub(r"_mmbtu_fraction_mmbtu$", "_mmbtu", x)) + ) + + # Calculate total fuel cost for each plant, and divide it out + cost_group = ( + pd.merge( + # Sum up all the fuel costs, and divide the individual fuel + # costs by it (they are all contained in single higher + # level group of columns labeled fuel_cost) + df.loc[:, "fuel_cost"].div(df.loc[:, "fuel_cost"].sum(axis=1), axis="rows"), + # Merge that same total into the dataframe separately as well. + plant_year_totals.loc[:, "fuel_cost"], + right_index=True, + left_index=True, + ) + .rename(columns=lambda x: re.sub(r"$", "_fraction_cost", x)) + .rename(columns=lambda x: re.sub(r"_cost_fraction_cost$", "_cost", x)) + ) + + # Re-unify the cost and heat content information: + df = pd.merge( + mmbtu_group, cost_group, left_index=True, right_index=True + ).reset_index() + + # Label each plant-year record by primary fuel: + df.loc[:, ["primary_fuel_by_cost", "primary_fuel_by_mmbtu"]] = pd.NA + df = df.astype( + { + "primary_fuel_by_cost": pd.StringDtype(), + "primary_fuel_by_mmbtu": pd.StringDtype(), + } + ) + for fuel_str in fuel_categories: + try: + mmbtu_mask = df[f"{fuel_str}_fraction_mmbtu"] > thresh + df.loc[mmbtu_mask, "primary_fuel_by_mmbtu"] = fuel_str + except KeyError: + pass + + try: + cost_mask = df[f"{fuel_str}_fraction_cost"] > thresh + df.loc[cost_mask, "primary_fuel_by_cost"] = fuel_str + except KeyError: + pass + + df[["primary_fuel_by_cost", "primary_fuel_by_mmbtu"]] = df[ + ["primary_fuel_by_cost", "primary_fuel_by_mmbtu"] + ].fillna("") + + return df diff --git a/src/pudl/analysis/record_linkage/__init__.py b/src/pudl/analysis/record_linkage/__init__.py new file mode 100644 index 0000000000..f7a6a17052 --- /dev/null +++ b/src/pudl/analysis/record_linkage/__init__.py @@ -0,0 +1,7 @@ +"""This module impolements models for various forms of record linkage.""" +from . import ( + classify_plants_ferc1, + embed_dataframe, + link_cross_year, + name_cleaner, +) diff --git a/src/pudl/analysis/record_linkage/classify_plants_ferc1.py b/src/pudl/analysis/record_linkage/classify_plants_ferc1.py new file mode 100644 index 0000000000..78010b265b --- /dev/null +++ b/src/pudl/analysis/record_linkage/classify_plants_ferc1.py @@ -0,0 +1,164 @@ +"""Scikit-Learn classification pipeline for identifying related FERC 1 plant records. + +Sadly FERC doesn't provide any kind of real IDs for the plants that report to them -- +all we have is their names (a freeform string) and the data that is reported alongside +them. This is often enough information to be able to recognize which records ought to be +associated with each other year to year to create a continuous time series. However, we +want to do that programmatically, which means using some clustering / categorization +tools from scikit-learn +""" + +import pandas as pd +from dagster import graph_asset, op + +import pudl +from pudl.analysis.record_linkage import embed_dataframe +from pudl.analysis.record_linkage.link_cross_year import link_ids_cross_year + +logger = pudl.logging_helpers.get_logger(__name__) + + +_FUEL_COLS = [ + "coal_fraction_mmbtu", + "gas_fraction_mmbtu", + "nuclear_fraction_mmbtu", + "oil_fraction_mmbtu", + "waste_fraction_mmbtu", +] + + +ferc_dataframe_embedder = embed_dataframe.dataframe_embedder_factory( + { + "plant_name": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.NameCleaner(), + embed_dataframe.TextVectorizer(), + ], + weight=2.0, + columns=["plant_name_ferc1"], + ), + "plant_type": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), + embed_dataframe.CategoricalVectorizer(), + ], + weight=2.0, + columns=["plant_type"], + ), + "construction_type": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"), + embed_dataframe.CategoricalVectorizer(), + ], + columns=["construction_type"], + ), + "capacity_mw": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericalVectorizer(), + ], + columns=["capacity_mw"], + ), + "construction_year": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="fix_int_na"), + embed_dataframe.CategoricalVectorizer(), + ], + columns=["construction_year"], + ), + "utility_id_ferc1": embed_dataframe.ColumnVectorizer( + transform_steps=[embed_dataframe.CategoricalVectorizer()], + columns=["utility_id_ferc1"], + ), + "fuel_fractions": embed_dataframe.ColumnVectorizer( + transform_steps=[ + embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"), + embed_dataframe.NumericalVectorizer(), + embed_dataframe.NumericalNormalizer(), + ], + columns=_FUEL_COLS, + ), + } +) + + +@op +def plants_steam_validate_ids( + ferc1_steam_df: pd.DataFrame, label_df: pd.DataFrame +) -> pd.DataFrame: + """Tests that plant_id_ferc1 times series includes one record per year. + + Args: + ferc1_steam_df: A DataFrame of the data from the FERC 1 Steam table. + label_df: A DataFrame containing column of newly assigned plant labels. + + Returns: + The input dataframe, to enable method chaining. + """ + # Add column of labels to steam df + ferc1_steam_df.loc[:, "plant_id_ferc1"] = label_df["record_label"] + + ########################################################################## + # FERC PLANT ID ERROR CHECKING STUFF + ########################################################################## + + # Test to make sure that we don't have any plant_id_ferc1 time series + # which include more than one record from a given year. Warn the user + # if we find such cases (which... we do, as of writing) + year_dupes = ( + ferc1_steam_df.groupby(["plant_id_ferc1", "report_year"]) + .size() + .rename("year_dupes") + .reset_index() + .query("year_dupes>1") + ) + if len(year_dupes) > 0: + for dupe in year_dupes.itertuples(): + logger.error( + f"Found report_year={dupe.report_year} " + f"{dupe.year_dupes} times in " + f"plant_id_ferc1={dupe.plant_id_ferc1}" + ) + else: + logger.info("No duplicate years found in any plant_id_ferc1. Hooray!") + + return ferc1_steam_df + + +@op +def merge_steam_fuel_dfs( + ferc1_steam_df: pd.DataFrame, + fuel_fractions: pd.DataFrame, +) -> pd.DataFrame: + """Merge steam plants and fuel dfs to prepare inputs for ferc plant matching.""" + ffc = list(fuel_fractions.filter(regex=".*_fraction_mmbtu$").columns) + + # Grab fuel consumption proportions for use in assigning plant IDs: + return ferc1_steam_df.merge( + fuel_fractions[["utility_id_ferc1", "plant_name_ferc1", "report_year"] + ffc], + on=["utility_id_ferc1", "plant_name_ferc1", "report_year"], + how="left", + ).astype({"plant_type": str, "construction_type": str}) + + +@graph_asset +def _out_ferc1__yearly_steam_plants_sched402_with_plant_ids( + core_ferc1__yearly_steam_plants_sched402: pd.DataFrame, + out_ferc1__yearly_steam_plants_fuel_by_plant_sched402: pd.DataFrame, +) -> pd.DataFrame: + """Assign IDs to the large steam plants.""" + ########################################################################### + # FERC PLANT ID ASSIGNMENT + ########################################################################### + # Now we need to assign IDs to the large steam plants, since FERC doesn't + # do this for us. + logger.info("Identifying distinct large FERC plants for ID assignment.") + + input_df = merge_steam_fuel_dfs( + core_ferc1__yearly_steam_plants_sched402, + out_ferc1__yearly_steam_plants_fuel_by_plant_sched402, + ) + feature_matrix = ferc_dataframe_embedder(input_df) + label_df = link_ids_cross_year(input_df, feature_matrix) + + return plants_steam_validate_ids(core_ferc1__yearly_steam_plants_sched402, label_df) diff --git a/src/pudl/analysis/record_linkage/embed_dataframe.py b/src/pudl/analysis/record_linkage/embed_dataframe.py new file mode 100644 index 0000000000..46c9aa1ca0 --- /dev/null +++ b/src/pudl/analysis/record_linkage/embed_dataframe.py @@ -0,0 +1,183 @@ +"""Tools for embedding a DataFrame to create feature matrix for models.""" +from abc import ABC, abstractmethod +from dataclasses import dataclass + +import numpy as np +import pandas as pd +import scipy +from dagster import graph, op +from pydantic import BaseModel +from sklearn.base import BaseEstimator +from sklearn.compose import ColumnTransformer +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import ( + FunctionTransformer, + MinMaxScaler, + Normalizer, + OneHotEncoder, +) + +import pudl +from pudl.analysis.record_linkage.name_cleaner import CompanyNameCleaner + + +@dataclass +class FeatureMatrix: + """Class to wrap a feature matrix returned from dataframe embedding. + + Depending on the transformations applied, a feature matrix may be sparse or dense + matrix. Using this wrapper enables Dagsters type checking while allowing both dense + and sparse matrices underneath. + """ + + matrix: np.ndarray | scipy.sparse.csr_matrix + + +class TransformStep(BaseModel, ABC): + """TransformStep's can be combined to vectorize one or more columns. + + This class defines a very simple interface for TransformStep's, which essentially + says that a TransformStep should take configuration and implement the method as_transformer. + """ + + name: str + + @abstractmethod + def as_transformer(self) -> BaseEstimator: + """This method should use configuration to produce a :class:`sklearn.base.BaseEstimator`.""" + ... + + +class ColumnVectorizer(BaseModel): + """Define a set of transformations to apply to one or more columns.""" + + transform_steps: list[TransformStep] + weight: float = 1.0 + columns: list[str] + + def as_pipeline(self): + """Return :class:`sklearn.pipeline.Pipeline` with configuration.""" + return Pipeline( + [ + ( + step.name, + step.as_transformer(), + ) + for step in self.transform_steps + ] + ) + + +def dataframe_embedder_factory(vectorizers: dict[str, ColumnVectorizer]): + """Return a configured op graph to embed an input dataframe.""" + + @op + def train_dataframe_embedder(df: pd.DataFrame): + """Train :class:`sklearn.compose.ColumnTransformer` on input.""" + column_transformer = ColumnTransformer( + transformers=[ + (name, column_transform.as_pipeline(), column_transform.columns) + for name, column_transform in vectorizers.items() + ], + transformer_weights={ + name: column_transform.weight + for name, column_transform in vectorizers.items() + }, + ) + + return column_transformer.fit(df) + + @op + def apply_dataframe_embedder(df: pd.DataFrame, transformer: ColumnTransformer): + """Use :class:`sklearn.compose.ColumnTransformer` to transform input.""" + return FeatureMatrix(matrix=transformer.transform(df)) + + @graph + def embed_dataframe(df: pd.DataFrame) -> FeatureMatrix: + """Train dataframe embedder and apply to input df.""" + transformer = train_dataframe_embedder(df) + return apply_dataframe_embedder(df, transformer) + + return embed_dataframe + + +class TextVectorizer(TransformStep): + """Implement TransformStep for :class:`sklearn.feature_extraction.text.TfidfVectorizer`.""" + + name: str = "tfidf_vectorizer" + + #: See sklearn documentation for all options + options: dict = {"analyzer": "char", "ngram_range": (2, 10)} + + def as_transformer(self): + """Return configured TfidfVectorizer.""" + return TfidfVectorizer(**self.options) + + +class CategoricalVectorizer(TransformStep): + """Implement TransformStep for :class:`sklearn.preprocessing.OneHotEncoder`.""" + + name: str = "one_hot_encoder_vectorizer" + + options: dict = {"categories": "auto"} + + def as_transformer(self): + """Return configured OneHotEncoder.""" + return OneHotEncoder(**self.options) + + +class NumericalVectorizer(TransformStep): + """Implement ColumnTransformation for MinMaxScaler.""" + + name: str = "numerical_vectorizer" + options: dict = {} + + def as_transformer(self): + """Return configured MinMaxScalerConfig.""" + return MinMaxScaler(**self.options) + + +class NumericalNormalizer(TransformStep): + """Implement ColumnTransformation for Normalizer.""" + + name: str = "numerical_normalizer" + options: dict = {} + + def as_transformer(self): + """Return configured NormalizerConfig.""" + return Normalizer(**self.options) + + +def _apply_cleaning_func(df, function_key: str = None): + function_transforms = { + "null_to_zero": lambda df: df.fillna(value=0.0), + "null_to_empty_str": lambda df: df.fillna(value=""), + "fix_int_na": lambda df: pudl.helpers.fix_int_na(df, columns=list(df.columns)), + } + + return function_transforms[function_key](df) + + +class ColumnCleaner(TransformStep): + """Implement ColumnTransformation for cleaning functions.""" + + name: str = "column_cleaner" + cleaning_function: str + + def as_transformer(self): + """Return configured NormalizerConfig.""" + return FunctionTransformer( + _apply_cleaning_func, kw_args={"function_key": self.cleaning_function} + ) + + +class NameCleaner(TransformStep): + """Implement ColumnTransformation for CompanyNameCleaner.""" + + name: str = "name_cleaner" + company_cleaner: CompanyNameCleaner = CompanyNameCleaner() + + def as_transformer(self): + """Return configured CompanyNameCleaner.""" + return FunctionTransformer(self.company_cleaner.apply_name_cleaning) diff --git a/src/pudl/analysis/record_linkage/link_cross_year.py b/src/pudl/analysis/record_linkage/link_cross_year.py new file mode 100644 index 0000000000..1054897cb4 --- /dev/null +++ b/src/pudl/analysis/record_linkage/link_cross_year.py @@ -0,0 +1,273 @@ +"""Define a record linkage model interface and implement common functionality.""" +from pathlib import Path +from tempfile import TemporaryDirectory + +import numpy as np +import pandas as pd +from dagster import Config, graph, op +from numba import njit +from numba.typed import List +from sklearn.cluster import DBSCAN, AgglomerativeClustering +from sklearn.metrics import pairwise_distances_chunked +from sklearn.neighbors import NearestNeighbors + +import pudl +from pudl.analysis.record_linkage.embed_dataframe import FeatureMatrix + +logger = pudl.logging_helpers.get_logger(__name__) + + +class PenalizeReportYearDistanceConfig(Config): + """Compute distance between records and add penalty to records from same year.""" + + distance_penalty: float = 10000.0 + metric: str = "cosine" + + +class DistanceMatrix: + """Class to wrap a distance matrix saved in a np.memmap.""" + + def __init__( + self, + feature_matrix: np.ndarray, + original_df: pd.DataFrame, + config: PenalizeReportYearDistanceConfig, + ): + """Compute distance matrix from feature_matrix and write to memmap.""" + self.file_buffer = TemporaryDirectory() + + filename = Path(self.file_buffer.name) / "distance_matrix.dat" + self.distance_matrix = np.memmap( + filename, + dtype="float32", + mode="w+", + shape=(feature_matrix.shape[0], feature_matrix.shape[0]), + ) + + # Compute distances in chunks and write to memmap + row_start = 0 + for chunk in pairwise_distances_chunked(feature_matrix, metric=config.metric): + self.distance_matrix[row_start : row_start + len(chunk), :] = chunk[:, :] + self.distance_matrix.flush() + row_start += len(chunk) + + # Apply distance penalty to records from the same year + year_inds = original_df.groupby("report_year").indices + for inds in year_inds.values(): + matching_year_inds = np.array(np.meshgrid(inds, inds)).T.reshape(-1, 2) + self.distance_matrix[ + matching_year_inds[:, 0], matching_year_inds[:, 1] + ] = config.distance_penalty + + np.fill_diagonal(self.distance_matrix, 0) + self.distance_matrix.flush() + + # Convert distance matrix to read only memory map + self.distance_matrix = np.memmap( + filename, + dtype="float32", + mode="r", + shape=(feature_matrix.shape[0], feature_matrix.shape[0]), + ) + + +def get_cluster_distance_matrix( + distance_matrix: np.ndarray, cluster_inds: np.ndarray +) -> np.ndarray: + """Return a distance matrix with only distances within a cluster.""" + cluster_size = len(cluster_inds) + dist_inds = np.array(np.meshgrid(cluster_inds, cluster_inds)).T.reshape(-1, 2) + return distance_matrix[dist_inds[:, 0], dist_inds[:, 1]].reshape( + (cluster_size, cluster_size) + ) + + +@njit +def get_average_distance_matrix( + distance_matrix: np.ndarray, + cluster_groups: list[list[int]], +) -> np.ndarray: + """Compute average distance between two clusters of records given indices of each cluster.""" + # Prepare a distance matrix of (n_clusters x n_clusters) + # Distance matrix contains average distance between each cluster + n_clusters = len(cluster_groups) + average_dist_matrix = np.zeros((n_clusters, n_clusters)) + + # Heavy nested looping optimized by numba + for i, cluster_i in enumerate(cluster_groups): + for j, cluster_j in enumerate(cluster_groups[:i]): + total_dist = 0 + for cluster_i_ind in cluster_i: + for cluster_j_ind in cluster_j: + total_dist += distance_matrix[cluster_i_ind, cluster_j_ind] + + average_dist = total_dist / (len(cluster_i) + len(cluster_j)) + average_dist_matrix[i, j] = average_dist + average_dist_matrix[j, i] = average_dist + + return average_dist_matrix + + +@op +def compute_distance_with_year_penalty( + config: PenalizeReportYearDistanceConfig, + feature_matrix: FeatureMatrix, + original_df: pd.DataFrame, +) -> DistanceMatrix: + """Compute a distance matrix and penalize records from the same year.""" + return DistanceMatrix(feature_matrix.matrix, original_df, config) + + +class DBSCANConfig(Config): + """Configuration for DBSCAN step.""" + + #: See :class:`sklearn.cluster.DBSCAN` for details. + eps: float = 0.5 + min_samples: int = 1 + + +@op +def cluster_records_dbscan( + config: DBSCANConfig, distance_matrix: DistanceMatrix, original_df: pd.DataFrame +) -> pd.DataFrame: + """Generate initial IDs using DBSCAN algorithm.""" + # DBSCAN is very efficient when passed a sparse radius neighbor graph + neighbor_computer = NearestNeighbors(radius=config.eps, metric="precomputed") + neighbor_computer.fit(distance_matrix.distance_matrix) + neighbor_graph = neighbor_computer.radius_neighbors_graph(mode="distance") + + # Classify records + classifier = DBSCAN(metric="precomputed", eps=config.eps, min_samples=2) + + # Create dataframe containing only report year and label columns + return pd.DataFrame( + { + "report_year": original_df.loc[:, "report_year"], + "record_label": classifier.fit_predict(neighbor_graph), + } + ) + + +class SplitClustersConfig(Config): + """Configuration for AgglomerativeClustering used to split overmerged clusters.""" + + #: See :class:`sklearn.cluster.AgglomerativeClustering` for details. + distance_threshold: float = 0.5 + + +@op +def split_clusters( + config: SplitClustersConfig, + distance_matrix: DistanceMatrix, + id_year_df: pd.DataFrame, +) -> pd.DataFrame: + """Split clusters with multiple records from same report_year. + + DBSCAN will sometimes match records from the same report year, which breaks the + assumption that there should only be one record for each entity from a single + report year. To fix this, agglomerative clustering will be applied to each + such cluster. Agglomerative clustering could replace DBSCAN in the initial linkage + step to avoid these matches in the first place, however, it is very inneficient on + a large number of records, so applying to smaller sets of overmerged records is + much faster and uses much less memory. + """ + + def _generate_cluster_ids(max_cluster_id: int) -> int: + """Get new unique cluster id.""" + while True: + max_cluster_id += 1 + yield max_cluster_id + + cluster_id_generator = _generate_cluster_ids(id_year_df.record_label.max()) + classifier = AgglomerativeClustering( + metric="precomputed", + linkage="average", + distance_threshold=config.distance_threshold, + n_clusters=None, + ) + duplicated_ids = id_year_df.loc[ + id_year_df.duplicated(subset=["report_year", "record_label"]), + "record_label", + ] + + for duplicated_id in duplicated_ids.unique(): + # IDs of -1 will be handled seperately + if duplicated_id == -1: + continue + + cluster_inds = id_year_df[ + id_year_df.record_label == duplicated_id + ].index.to_numpy() + cluster_distances = get_cluster_distance_matrix( + distance_matrix.distance_matrix, cluster_inds + ) + + new_labels = classifier.fit_predict(cluster_distances) + for new_label in np.unique(new_labels): + df_inds = cluster_inds[new_labels == new_label] + id_year_df.loc[df_inds, "record_label"] = next(cluster_id_generator) + + return id_year_df + + +class MatchOrpahnedRecordsConfig(Config): + """Configuration for :func:`match_orphaned_records` op.""" + + #: See :class:`sklearn.cluster.AgglomerativeClustering` for details. + distance_threshold: float = 0.5 + + +@op +def match_orphaned_records( + config: MatchOrpahnedRecordsConfig, + distance_matrix: DistanceMatrix, + id_year_df: pd.DataFrame, +) -> pd.DataFrame: + """DBSCAN assigns 'noisy' records a label of '-1', which will be labeled by this step. + + To label orphaned records, points are seperated into clusters where each orphaned record + is a cluster of a single point. Then, a distance matrix is computed with the average + distance between each cluster, and is used in a round of agglomerative clustering. + This will match orphaned records to existing clusters, or assign them unique ID's if + they don't appear close enough to any existing clusters. + """ + classifier = AgglomerativeClustering( + metric="precomputed", + linkage="average", + distance_threshold=config.distance_threshold, + n_clusters=None, + ) + + cluster_inds = id_year_df.groupby("record_label").indices + + # Orphaned records are considered a cluster of a single record + cluster_groups = [List([ind]) for ind in cluster_inds.get(-1, [])] + + # Get list of all points in each assigned cluster + cluster_groups += [List(inds) for key, inds in cluster_inds.items() if key != -1] + cluster_groups = List(cluster_groups) + + average_dist_matrix = get_average_distance_matrix( + distance_matrix.distance_matrix, cluster_groups + ) + + # Assign new labels to all points + new_labels = classifier.fit_predict(average_dist_matrix) + for inds, label in zip(cluster_groups, new_labels): + id_year_df.loc[inds, "record_label"] = label + + return id_year_df + + +@graph +def link_ids_cross_year(df: pd.DataFrame, feature_matrix: FeatureMatrix): + """Apply model and return column of estimated record labels.""" + # Compute distances and apply penalty for records from same year + distance_matrix = compute_distance_with_year_penalty(feature_matrix, df) + + # Label records + id_year_df = cluster_records_dbscan(distance_matrix, df) + id_year_df = split_clusters(distance_matrix, id_year_df) + id_year_df = match_orphaned_records(distance_matrix, id_year_df) + + return id_year_df diff --git a/src/pudl/analysis/record_linkage/name_cleaner.py b/src/pudl/analysis/record_linkage/name_cleaner.py new file mode 100644 index 0000000000..f2f316ea0b --- /dev/null +++ b/src/pudl/analysis/record_linkage/name_cleaner.py @@ -0,0 +1,269 @@ +"""This module contains the implementation of CompanyNameCleaner class from OS-Climate's financial-entity-cleaner package.""" + +import enum +import json +import logging +import re +from importlib.resources import as_file, files +from typing import Literal + +import pandas as pd +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +CLEANING_RULES_DICT = { + "remove_email": [" ", r"\S*@\S*\s?"], + "remove_url": [" ", r"https*\S+"], + "remove_word_the_from_the_end": [" ", r"the$"], + "place_word_the_at_the_beginning": [" ", r"the$"], + "remove_www_address": [" ", r"https?://[.\w]{3,}|www.[.\w]{3,}"], + "enforce_single_space_between_words": [" ", r"\s+"], + "replace_amperstand_by_AND": [" and ", r"&"], + "add_space_between_amperstand": [" & ", r"&"], + "add_space_before_opening_parentheses": [" (", r"\("], + "add_space_after_closing_parentheses": [") ", r"\)"], + "replace_amperstand_between_space_by_AND": [" and ", r"\s+&\s+"], + "replace_hyphen_by_space": [" ", r"-"], + "replace_hyphen_between_spaces_by_single_space": [" ", r"\s+-\s+"], + "replace_underscore_by_space": [" ", r"_"], + "replace_underscore_between_spaces_by_single_space": [" ", r"\s+_\s+"], + "remove_all_punctuation": [" ", r"([^\w\s])"], + "remove_punctuation_except_dot": [" ", r"([^\w\s.])"], + "remove_mentions": [" ", r"@\S+"], + "remove_hashtags": [" ", r"#\S+"], + "remove_numbers": [" ", r"\w*\d+\w*"], + "remove_text_puctuation": [" ", r'\;|\:|\,|\.|\?|\!|"'], + "remove_text_puctuation_except_dot": [" ", r'\;|\:|\,|\?|\!|"'], + "remove_math_symbols": [" ", r"\+|\-|\*|\>|\<|\=|\%"], + "remove_math_symbols_except_dash": [" ", r"\+|\*|\>|\<|\=|\%"], + "remove_parentheses": ["", r"\(|\)"], + "remove_brackets": ["", r"\[|\]"], + "remove_curly_brackets": ["", r"\{|\}"], + "remove_single_quote_next_character": [" ", r"'\w+"], + "remove_single_quote": [" ", r"'"], + "remove_double_quote": [" ", r'"'], + "remove_words_in_parentheses": [" ", r"\([^()]*\)"], + "repeat_remove_words_in_parentheses": [" ", r"remove_words_in_parentheses"], +} + + +class LegalTermLocation(enum.Enum): + """The location of the legal terms within the name string.""" + + AT_THE_END = 1 + ANYWHERE = 2 + + +class CompanyNameCleaner(BaseModel): + """Class to normalize/clean up text based company names.""" + + # Constants used internally by the class + __NAME_LEGAL_TERMS_DICT_FILE = "us_legal_forms.json" + __NAME_JSON_ENTRY_LEGAL_TERMS = "legal_forms" + + #: A flag to indicate if the cleaning process must normalize + #: text's legal terms. e.g. LTD => LIMITED. + cleaning_rules_list: list[str] = [ + "replace_amperstand_between_space_by_AND", + "replace_hyphen_between_spaces_by_single_space", + "replace_underscore_by_space", + "replace_underscore_between_spaces_by_single_space", + "remove_text_puctuation_except_dot", + "remove_math_symbols", + "remove_words_in_parentheses", + "remove_parentheses", + "remove_brackets", + "remove_curly_brackets", + "enforce_single_space_between_words", + ] + + #: A flag to indicate if the cleaning process must normalize + normalize_legal_terms: bool = True + + #: Define if unicode characters should be removed from text's name + #: This cleaning rule is treated separated from the regex rules because it depends on the + #: language of the text's name. For instance, russian or japanese text's may contain + #: unicode characters, while portuguese and french companies may not. + remove_unicode: bool = False + + #: Define the letter case of the cleaning output + output_lettercase: Literal["lower", "title"] = "lower" + + #: Where in the string are legal terms found + legal_term_location: LegalTermLocation = LegalTermLocation.AT_THE_END + + #: Define if the letters with accents are replaced with non-accented ones + remove_accents: bool = False + + def _apply_regex_rules( + self, str_value: str, dict_regex_rules: dict[str, list[str]] + ) -> str: + r"""Applies several cleaning rules based on a custom dictionary. + + The dictionary must contain cleaning rules written in regex format. + + Arguments: + str_value (str): any value as string to be cleaned up. + dict_regex_rules (dict): a dictionary of cleaning rules writen in regex with the format + [rule name] : ['replacement', 'regex rule'] + + Returns: + (str): the modified/cleaned value. + """ + clean_value = str_value + # Iterate through the dictionary and apply each regex rule + for name_rule, cleaning_rule in dict_regex_rules.items(): + # First element is the replacement + replacement = cleaning_rule[0] + # Second element is the regex rule + regex_rule = cleaning_rule[1] + + # Check if the regex rule is actually a reference to another regex rule. + # By adding a name of another regex rule in the place of the rule itself allows the execution + # of a regex rule twice + if regex_rule in dict_regex_rules: + replacement = dict_regex_rules[cleaning_rule[1]][0] + regex_rule = dict_regex_rules[cleaning_rule[1]][1] + + # Make sure to use raw string + regex_rule = rf"{regex_rule}" + + # Treat the special case of the word THE at the end of a text's name + found_the_word_the = None + if name_rule == "place_word_the_at_the_beginning": + found_the_word_the = re.search(regex_rule, clean_value) + + # Apply the regex rule + clean_value = re.sub(regex_rule, replacement, clean_value) + + # Adjust the name for the case of rule + if found_the_word_the is not None: + clean_value = "the " + clean_value + + return clean_value + + def _remove_unicode_chars(self, value: str) -> str: + """Removes unicode character that is unreadable when converted to ASCII format. + + Arguments: + value (str): any string containing unicode characters. + + Returns: + (str): the corresponding input string without unicode characters. + """ + # Remove all unicode characters if any + clean_value = value.encode("ascii", "ignore").decode() + return clean_value + + def _apply_cleaning_rules(self, company_name: str) -> str: + """Apply the cleaning rules from the dictionary of regex rules.""" + cleaning_dict = {} + for rule_name in self.cleaning_rules_list: + cleaning_dict[rule_name] = CLEANING_RULES_DICT[rule_name] + + # Apply all the cleaning rules + clean_company_name = self._apply_regex_rules(company_name, cleaning_dict) + return clean_company_name + + def _apply_normalization_of_legal_terms(self, company_name: str) -> str: + """Apply the normalizattion of legal terms according to dictionary of regex rules.""" + # Make sure to remove extra spaces, so legal terms can be found in the end (if requested) + clean_company_name = company_name.strip() + + # The dictionary of legal terms define how to normalize the text's legal form abreviations + json_source = files("pudl.package_data.settings").joinpath( + self.__NAME_LEGAL_TERMS_DICT_FILE + ) + with as_file(json_source) as json_file_path: + _dict_legal_terms = json.load(json_file_path.open())[ + self.__NAME_JSON_ENTRY_LEGAL_TERMS + ]["en"] + + # Apply normalization for legal terms + # Iterate through the dictionary of legal terms + for replacement, legal_terms in _dict_legal_terms.items(): + # Each replacement has a list of possible terms to be searched for + replacement = " " + replacement.lower() + " " + for legal_term in legal_terms: + # Make sure to use raw string + legal_term = legal_term.lower() + # If the legal term has . (dots), then apply regex directly on the legal term + # Otherwise, if it's a legal term with only letters in sequence, make sure + # that regex find the legal term as a word (\\bLEGAL_TERM\\b) + if legal_term.find(".") > -1: + legal_term = legal_term.replace(".", "\\.") + else: + legal_term = "\\b" + legal_term + "\\b" + # Check if the legal term should be found only at the end of the string + if self.legal_term_location == LegalTermLocation.AT_THE_END: + legal_term = legal_term + "$" + # ...and it's a raw string + regex_rule = rf"{legal_term}" + # Apply the replacement + clean_company_name = re.sub(regex_rule, replacement, clean_company_name) + return clean_company_name + + def get_clean_data(self, company_name: str) -> str: + """Clean a name and normalize legal terms. + + If ``company_name`` is null or not a string value, pd.NA + will be returned. + + Arguments: + company_name (str): the original text + + Returns: + clean_company_name (str): the clean version of the text + """ + if not isinstance(company_name, str): + if company_name is not pd.NA: + logger.warning(f"{company_name} is not a string.") + return pd.NA + + # Remove all unicode characters in the text's name, if requested + if self.remove_unicode: + clean_company_name = self._remove_unicode_chars(company_name) + else: + clean_company_name = company_name + + # Remove space in the beginning and in the end and convert it to lower case + clean_company_name = clean_company_name.strip().lower() + + # Apply all the cleaning rules + clean_company_name = self._apply_cleaning_rules(clean_company_name) + + # Apply normalization for legal terms + if self.normalize_legal_terms: + clean_company_name = self._apply_normalization_of_legal_terms( + clean_company_name + ) + + # Apply the letter case, if different from 'lower' + if self.output_lettercase == "upper": + clean_company_name = clean_company_name.upper() + elif self.output_lettercase == "title": + clean_company_name = clean_company_name.title() + + # Remove excess of white space that might be introduced during previous cleaning + clean_company_name = clean_company_name.strip() + clean_company_name = re.sub(r"\s+", " ", clean_company_name) + + return clean_company_name + + def apply_name_cleaning( + self, + df: pd.DataFrame, + ) -> pd.Series: + """Clean up text names in a dataframe. + + Arguments: + df (dataframe): the input dataframe that contains the text's name to be cleaned + in_company_name_attribute (str): the attribute in the dataframe that contains the names + out_company_name_attribute (str): the attribute to be created for the clean version of + the text's name + + Returns: + df (dataframe): the clean version of the input dataframe + """ + return df.squeeze().apply(self.get_clean_data) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index cade85ebe7..029600cbe9 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -78,6 +78,10 @@ *load_assets_from_modules( [pudl.analysis.state_demand], group_name="out_state_demand_ferc714" ), + *load_assets_from_modules( + [pudl.analysis.record_linkage.classify_plants_ferc1], + group_name="out_ferc1", + ), *load_assets_from_modules( [pudl.analysis.plant_parts_eia, pudl.analysis.eia_ferc1_record_linkage], group_name="eia_ferc1_record_linkage", diff --git a/src/pudl/metadata/resources/ferc1.py b/src/pudl/metadata/resources/ferc1.py index 6f64e0ccf9..1580fa1a24 100644 --- a/src/pudl/metadata/resources/ferc1.py +++ b/src/pudl/metadata/resources/ferc1.py @@ -596,7 +596,6 @@ "record_id", "utility_id_ferc1", "report_year", - "plant_id_ferc1", "plant_name_ferc1", "plant_type", "construction_type", diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index a2ef5fa7e0..edf6f68247 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -205,7 +205,7 @@ def _out_ferc1__yearly_plants_utilities( @asset(io_manager_key="pudl_sqlite_io_manager", compute_kind="Python") def _out_ferc1__yearly_steam_plants_sched402( _out_ferc1__yearly_plants_utilities: pd.DataFrame, - core_ferc1__yearly_steam_plants_sched402: pd.DataFrame, + _out_ferc1__yearly_steam_plants_sched402_with_plant_ids: pd.DataFrame, ) -> pd.DataFrame: """Select and joins some useful fields from the FERC Form 1 steam table. @@ -218,13 +218,14 @@ def _out_ferc1__yearly_steam_plants_sched402( Args: _out_ferc1__yearly_plants_utilities: Denormalized dataframe of FERC Form 1 plants and utilities data. - core_ferc1__yearly_steam_plants_sched402: The normalized FERC Form 1 steam table. + _out_ferc1__yearly_steam_plants_sched402_with_plant_ids: The FERC Form 1 steam table + with imputed plant IDs to group plants across report years. Returns: A DataFrame containing useful fields from the FERC Form 1 steam table. """ steam_df = ( - core_ferc1__yearly_steam_plants_sched402.merge( + _out_ferc1__yearly_steam_plants_sched402_with_plant_ids.merge( _out_ferc1__yearly_plants_utilities, on=["utility_id_ferc1", "plant_name_ferc1"], how="left", @@ -908,7 +909,7 @@ def out_ferc1__yearly_steam_plants_fuel_by_plant_sched402( """Summarize FERC fuel data by plant for output. This is mostly a wrapper around - :func:`pudl.analysis.classify_plants_ferc1.fuel_by_plant_ferc1` + :func:`pudl.analysis.record_linkage.classify_plants_ferc1.fuel_by_plant_ferc1` which calculates some summary values on a per-plant basis (as indicated by ``utility_id_ferc1`` and ``plant_name_ferc1``) related to fuel consumption. @@ -947,12 +948,12 @@ def drop_other_fuel_types(df): fbp_df = ( core_ferc1__yearly_steam_plants_fuel_sched402.pipe(drop_other_fuel_types) .pipe( - pudl.analysis.classify_plants_ferc1.fuel_by_plant_ferc1, + pudl.analysis.fuel_by_plant.fuel_by_plant_ferc1, fuel_categories=fuel_categories, thresh=thresh, ) - .pipe(pudl.analysis.classify_plants_ferc1.revert_filled_in_float_nulls) - .pipe(pudl.analysis.classify_plants_ferc1.revert_filled_in_string_nulls) + .pipe(pudl.analysis.fuel_by_plant.revert_filled_in_float_nulls) + .pipe(pudl.analysis.fuel_by_plant.revert_filled_in_string_nulls) .merge( _out_ferc1__yearly_plants_utilities, on=["utility_id_ferc1", "plant_name_ferc1"], diff --git a/src/pudl/package_data/settings/us_legal_forms.json b/src/pudl/package_data/settings/us_legal_forms.json new file mode 100644 index 0000000000..00d1a12e9e --- /dev/null +++ b/src/pudl/package_data/settings/us_legal_forms.json @@ -0,0 +1,222 @@ +{ + "reference_license": "//SPDX-License-Identifier: CC0-1.0", + "reference_url": "https://www.gleif.org/en/about-lei/code-lists/iso-20275-entity-legal-forms-code-list", + "reference_version": "1.3", + "created": "2021-09-20", + "updated": "2022-06-09", + "legal_forms": { + "en": { + "registered limited liability limited partnership": [ + "r.l.l.l.p.", + "r.l.l.l.p", + "rlllp.", + "rlllp" + ], + "registered limited liability partnership": [ + "r.l.l.p.", + "r.l.l.p", + "rllp.", + "rllp" + ], + "professional limited liability limited partnership": [ + "p.l.l.l.p.", + "p.l.l.l.p", + "plllp.", + "plllp" + ], + "limited liability limited partnership": [ + "lllp.", + "lllp", + "l.l.l.p.", + "l.l.l.p" + ], + "professional limited liabity partnership": [ + "p.l.l.p.", + "p.l.l.p", + "pllp.", + "pllp" + ], + "professional limited liability company": [ + "p.l.l.c.", + "p.l.l.c", + "pllc.", + "pllc" + ], + "limited liability partnership": [ + "l.l.p.", + "l.l.p", + "llp.", + "llp" + ], + "low-profit limited liability company": [ + "l3c.", + "l3c", + "l.3.c.", + "l.3.c", + "lllc.", + "lllc", + "l.l.l.c.", + "l.l.l.c" + ], + "limited liability company": [ + "ltd. liability co.", + "ltd. liability co", + "ltd liability co.", + "ltd liability co", + "l.l.c.", + "l.l.c", + "llc.", + "llc" + ], + "public limited company": [ + "public lc.", + "public lc", + "pub. lc.", + "pub. lc", + "pub lc.", + "pub lc", + "p.l.c.", + "p.l.c", + "plc.", + "plc" + ], + "limited partnership": [ + "lp.", + "lp", + "l.p.", + "l.p", + "lp" + ], + "professional corporation": [ + "prof. corp.", + "prof corp.", + "prof corp", + "pro. corp.", + "pro corp.", + "pro corp", + "pc.", + "p.c.", + "p.c", + "pc" + ], + "professional association": [ + "prof. assoc.", + "prof assoc.", + "prof assoc", + "prof. assn.", + "prof assn.", + "prof assn", + "pro. assoc.", + "pro assoc.", + "pro assoc", + "pro. assn.", + "pro assn.", + "pro assn", + "pa.", + "p.a.", + "p.a", + "pa" + ], + "corporation": [ + "corp.", + "corp" + ], + "company": [ + "c.o.", + "co.", + "c.o", + "co" + ], + "real state investment trust": [ + "reit.", + "reit", + "r.e.i.t.", + "r.e.i.t" + ], + "general partnership": [ + "gp.", + "g.p.", + "gp" + ], + "commercial registered agent": [ + "c.r.a.", + "cra.", + "cra" + ], + "national trust and savings association": [ + "nt&sa", + "nt & sa", + "nt. & sa.", + "nt. & sa", + "nt & sa." + ], + "national association": [ + "n.a.", + "n.a", + "n. assoc.", + "n. assoc", + "n assoc.", + "n assoc" + ], + "authority": [ + "auth.", + "auth" + ], + "foundation": [ + "fdn.", + "fdn", + "f.d.n.", + "f.d.n" + ], + "cooperative": [ + "co-op.", + "co-op", + "coop.", + "coop" + ], + "association": [ + "assoc.", + "assoc", + "assn.", + "assn" + ], + "basin irrigation district": [ + "basin irr district", + "basin irr. dist", + "basin irr dist.", + "basin irr. dist.", + "basin irr dist" + ], + "limited": [ + "limited.", + "limit", + "ltd.", + "ltd", + "l.t.d.", + "l.t.d", + "lt.", + "lt" + ], + "unlimited": [ + "ultd.", + "unltd", + "ult.", + "ult" + ], + "incorporated": [ + "inc.", + "inc", + "incorp.", + "incorp" + ], + "district": [ + "dist.", + "dist" + ], + "commission": [ + "comm.", + "comm" + ] + } + } +} diff --git a/src/pudl/transform/ferc1.py b/src/pudl/transform/ferc1.py index fee273ce57..30a2471ce2 100644 --- a/src/pudl/transform/ferc1.py +++ b/src/pudl/transform/ferc1.py @@ -25,10 +25,6 @@ from pydantic import BaseModel, Field, field_validator import pudl -from pudl.analysis.classify_plants_ferc1 import ( - plants_steam_assign_plant_ids, - plants_steam_validate_ids, -) from pudl.extract.ferc1 import TABLE_NAME_MAP_FERC1 from pudl.helpers import assert_cols_areclose, convert_cols_dtypes from pudl.metadata.fields import apply_pudl_dtypes @@ -3211,70 +3207,6 @@ class SteamPlantsTableTransformer(Ferc1AbstractTableTransformer): table_id: TableIdFerc1 = TableIdFerc1.STEAM_PLANTS - @cache_df(key="main") - def transform_main( - self, df: pd.DataFrame, transformed_fuel: pd.DataFrame - ) -> pd.DataFrame: - """Perform table transformations for the :ref:`core_ferc1__yearly_steam_plants_sched402` table. - - Note that this method has a non-standard call signature, since the - :ref:`core_ferc1__yearly_steam_plants_sched402` table depends on the :ref:`core_ferc1__yearly_steam_plants_fuel_sched402` table. - - Args: - df: The pre-processed steam plants table. - transformed_fuel: The fully transformed :ref:`core_ferc1__yearly_steam_plants_fuel_sched402` table. This is - required because fuel consumption information is used to help link - steam plant records together across years using - :func:`plants_steam_assign_plant_ids` - """ - fuel_categories = list( - SteamPlantsFuelTableTransformer() - .params.categorize_strings["fuel_type_code_pudl"] - .categories.keys() - ) - plants_steam = ( - super() - .transform_main(df) - .pipe( - plants_steam_assign_plant_ids, - ferc1_fuel_df=transformed_fuel, - fuel_categories=fuel_categories, - ) - .pipe(plants_steam_validate_ids) - ) - return plants_steam - - def transform( - self, - raw_dbf: pd.DataFrame, - raw_xbrl_instant: pd.DataFrame, - raw_xbrl_duration: pd.DataFrame, - transformed_fuel: pd.DataFrame, - ) -> pd.DataFrame: - """Redfine the transform method to accommodate the use of transformed_fuel. - - This is duplicating code from the parent class, but is necessary because the - steam table needs the fuel table for its transform. Is there a better way to do - this that doesn't require cutting and pasting the whole method just to stick the - extra dataframe input into transform_main()? - """ - df = ( - self.transform_start( - raw_dbf=raw_dbf, - raw_xbrl_instant=raw_xbrl_instant, - raw_xbrl_duration=raw_xbrl_duration, - ) - .pipe(self.transform_main, transformed_fuel=transformed_fuel) - .pipe(self.transform_end) - ) - if self.clear_cached_dfs: - logger.debug( - f"{self.table_id.value}: Clearing cached dfs: " - f"{sorted(self._cached_dfs.keys())}" - ) - self._cached_dfs.clear() - return df - class HydroelectricPlantsTableTransformer(Ferc1AbstractTableTransformer): """A table transformer specific to the :ref:`core_ferc1__yearly_hydroelectric_plants_sched406` table.""" @@ -6026,9 +5958,7 @@ def ferc1_transform_asset_factory( """Create an asset that pulls in raw ferc Form 1 assets and applies transformations. This is a convenient way to create assets for tables that only depend on raw dbf, - raw xbrl instant and duration tables and xbrl metadata. For tables with additional - upstream dependencies, create a stand alone asset using an asset decorator. See - the core_ferc1__yearly_steam_plants_sched402 asset. + raw xbrl instant and duration tables and xbrl metadata. Args: table_name: The name of the table to create an asset for. @@ -6114,49 +6044,13 @@ def create_ferc1_transform_assets() -> list[AssetsDefinition]: """ assets = [] for table_name, tfr_class in FERC1_TFR_CLASSES.items(): - # Bespoke exception. fuel must come before steam b/c fuel proportions are used to - # aid in FERC plant ID assignment. - if table_name != "core_ferc1__yearly_steam_plants_sched402": - assets.append(ferc1_transform_asset_factory(table_name, tfr_class)) + assets.append(ferc1_transform_asset_factory(table_name, tfr_class)) return assets ferc1_assets = create_ferc1_transform_assets() -@asset(io_manager_key="pudl_sqlite_io_manager") -def core_ferc1__yearly_steam_plants_sched402( - clean_xbrl_metadata_json: dict[str, dict[str, list[dict[str, Any]]]], - raw_ferc1_dbf__f1_steam: pd.DataFrame, - raw_ferc1_xbrl__steam_electric_generating_plant_statistics_large_plants_402_duration: pd.DataFrame, - raw_ferc1_xbrl__steam_electric_generating_plant_statistics_large_plants_402_instant: pd.DataFrame, - core_ferc1__yearly_steam_plants_fuel_sched402: pd.DataFrame, -) -> pd.DataFrame: - """Create the clean core_ferc1__yearly_steam_plants_sched402 table. - - Args: - clean_xbrl_metadata_json: XBRL metadata json for all tables. - raw_ferc1_dbf__f1_steam: Raw f1_steam table. - raw_ferc1_xbrl__steam_electric_generating_plant_statistics_large_plants_402_duration: raw XBRL duration table. - raw_ferc1_xbrl__steam_electric_generating_plant_statistics_large_plants_402_instant: raw XBRL instant table. - core_ferc1__yearly_steam_plants_fuel_sched402: Transformed core_ferc1__yearly_steam_plants_fuel_sched402 table. - - Returns: - Clean core_ferc1__yearly_steam_plants_sched402 table. - """ - df = SteamPlantsTableTransformer( - xbrl_metadata_json=clean_xbrl_metadata_json[ - "core_ferc1__yearly_steam_plants_sched402" - ] - ).transform( - raw_dbf=raw_ferc1_dbf__f1_steam, - raw_xbrl_instant=raw_ferc1_xbrl__steam_electric_generating_plant_statistics_large_plants_402_instant, - raw_xbrl_duration=raw_ferc1_xbrl__steam_electric_generating_plant_statistics_large_plants_402_duration, - transformed_fuel=core_ferc1__yearly_steam_plants_fuel_sched402, - ) - return convert_cols_dtypes(df, data_source="ferc1") - - def other_dimensions(table_names: list[str]) -> list[str]: """Get a list of the other dimension columns across all of the transformers.""" # grab all of the dimensions columns that we are currently verifying as a part of diff --git a/src/pudl/transform/params/ferc1.py b/src/pudl/transform/params/ferc1.py index edf7d3593e..6f65815675 100644 --- a/src/pudl/transform/params/ferc1.py +++ b/src/pudl/transform/params/ferc1.py @@ -1314,6 +1314,7 @@ "combined cycle - 40%", "combined cycle ctg", "combined cycle oper", + "combined cycle operation", "combined cycle opern", "combined cycle plant:", "combined turbine", @@ -1337,6 +1338,7 @@ "gas turb/cumb. cyc", "gas turb/cumbus cycl", "gas turbine / steam", + "gas turbine-combine cycle", "gas turbine-combined cycle", "gas turbine/ steam", "gas turbine/steam", diff --git a/test/integration/record_linkage_test.py b/test/integration/record_linkage_test.py new file mode 100644 index 0000000000..c52141c7b5 --- /dev/null +++ b/test/integration/record_linkage_test.py @@ -0,0 +1,240 @@ +"""Test core record linkage functionality.""" +# ruff: noqa: S311 + +import random +import string + +import numpy as np +import pandas as pd +import pytest +from dagster import graph + +import pudl +from pudl.analysis.record_linkage.classify_plants_ferc1 import ( + _FUEL_COLS, + ferc_dataframe_embedder, +) +from pudl.analysis.record_linkage.link_cross_year import link_ids_cross_year +from pudl.transform.params.ferc1 import ( + CONSTRUCTION_TYPE_CATEGORIES, + PLANT_TYPE_CATEGORIES, + VALID_PLANT_YEARS, +) + +_RANDOM_GENERATOR = np.random.default_rng(12335) + +logger = pudl.logging_helpers.get_logger(__name__) + + +def _randomly_modify_string(input_str: str, k: int = 5) -> str: + """Generate up to k random edits of input string.""" + edit_types = ["add", "delete", "substitute"] + + # Easier to modify list than str + input_list = list(input_str) + + # Possible characters to select from when performing "add" or "substitute" + # Letters are included twice to increase odds of selecting a letter + characters = ( + string.digits + + string.ascii_letters + + string.ascii_letters + + string.punctuation + + string.whitespace + ) + + # Generate random number between 0-k many times and taking min + # This biases the number of edits to generally be low + num_edits = min(random.randrange(k) for i in range(10)) + for _ in range(num_edits): + edit = edit_types[random.randrange(3)] + position = random.randrange(len(input_str) - 1) + + if edit == "add": + input_list.insert(position, random.choice(characters)) + elif edit == "delete": + input_list.pop(position) + else: + input_list[position] = random.choice(characters) + + return "".join(input_list) + + +def _generate_fuel_cols(plant_type: str, size: int) -> pd.DataFrame: + fuel_cols = pd.DataFrame([[pd.NA] * len(_FUEL_COLS)] * size, columns=_FUEL_COLS) + if plant_type == "nuclear": + fuel_cols["nuclear_fraction_mmbtu"] = 1.0 + elif plant_type == "steam": + fuel_cols["coal_fraction_mmbtu"] = random.uniform(0.6, 1) + fuel_cols["gas_fraction_mmbtu"] = 1 - fuel_cols["coal_fraction_mmbtu"] + elif ( + plant_type == "internal_combustion" + or plant_type == "combustion_turbine" + or plant_type == "combined_cycle" + ): + fuel_cols["gas_fraction_mmbtu"] = random.uniform(0, 1) + fuel_cols["oil_fraction_mmbtu"] = 1 - fuel_cols["gas_fraction_mmbtu"] + + return fuel_cols + + +def _noisify(col: pd.Series, sigma: float = 0.01, probability: float = 1) -> pd.Series: + """Add random noise to a column.""" + noisy_rows = _RANDOM_GENERATOR.random(len(col)) > (1 - probability) + modifier_array = np.zeros(len(col)) + modifier_array[noisy_rows] += _RANDOM_GENERATOR.normal( + scale=sigma, size=sum(noisy_rows) + ) + return col + modifier_array + + +def _modify_categorical( + df: pd.DataFrame, col: str, categories: list, probability: float = 0.01 +) -> pd.DataFrame: + """Randomly modify categorical column using given probability.""" + modify_elements = _RANDOM_GENERATOR.random(len(df)) > (1 - probability) + df.loc[modify_elements, col] = _RANDOM_GENERATOR.choice(categories) + return df + + +def _generate_random_test_df( + default_plant_name: str, + size: int = 2022 - 1994, + plant_name_max_edits: int = 5, + plant_type=random.choice(list(PLANT_TYPE_CATEGORIES["categories"].keys())), + construction_type=random.choice( + list(CONSTRUCTION_TYPE_CATEGORIES["categories"].keys()) + ), + plant_type_error_prob: float = 0.01, + construction_type_error_prob: float = 0.01, + construction_year_error_prob: float = 0.01, + capacity_mean: float = 500.0, + capacity_sigma: float = 10.0, + capacity_change_prob: float = 0.01, + utility_id_error_prob: float = 0.01, + utility_id: int = random.randrange(1000), +): + """Generate a random input DataFrame for testing record linkage.""" + + generated_df = pd.DataFrame( + { + "base_plant_name": [default_plant_name] * size, + "plant_type": [plant_type] * size, + "report_year": list(range(1994, 1994 + size)), + "construction_type": [construction_type] * size, + "capacity_mw": [capacity_mean] * size, + "construction_year": [ + random.randrange( + VALID_PLANT_YEARS["lower_bound"], VALID_PLANT_YEARS["upper_bound"] + ) + ] + * size, + "utility_id_ferc1": [utility_id] * size, + } + ) + + # Add random edits to plant name + generated_df["plant_name_ferc1"] = generated_df["base_plant_name"].apply( + _randomly_modify_string + ) + + # Modify capacity rows based on probability and sigma + generated_df["capacity_mw"] = _noisify( + generated_df["capacity_mw"], + sigma=capacity_sigma, + probability=capacity_change_prob, + ) + + # Modify categorical columns + generated_df = _modify_categorical( + generated_df, + "construction_type", + list(CONSTRUCTION_TYPE_CATEGORIES["categories"]), + construction_type_error_prob, + ) + generated_df = _modify_categorical( + generated_df, + "construction_year", + list(range(VALID_PLANT_YEARS["lower_bound"], VALID_PLANT_YEARS["upper_bound"])), + construction_year_error_prob, + ) + generated_df = _modify_categorical( + generated_df, + "plant_type", + list(PLANT_TYPE_CATEGORIES["categories"]), + plant_type_error_prob, + ) + + # Generate vectors of fuel fractions + generated_df[_FUEL_COLS] = _generate_fuel_cols(plant_type, size) + + # Add minor noise to fuel fractions + for col in _FUEL_COLS: + generated_df[col] = _noisify(generated_df[col]) + + return generated_df + + +@pytest.fixture +def mock_ferc1_plants_df(): + """Returns a test DataFrame for use in generic record linkage testing.""" + # Creates test dataframes for a bunch of plants and concatenates them + return pd.concat( + [ + _generate_random_test_df("fox lake, mn"), + _generate_random_test_df("maalaea", capacity_mean=50.0), + _generate_random_test_df("colstrip 1 & 2", capacity_mean=700.0), + _generate_random_test_df("wyman 4", capacity_mean=600.0, size=5), + _generate_random_test_df("mcintosh", capacity_mean=300.0, size=6), + _generate_random_test_df("boulevard", capacity_mean=40.0, size=12), + _generate_random_test_df("eagle mountain", capacity_mean=400.0, size=11), + _generate_random_test_df("eagle", capacity_mean=150.0, size=14), + _generate_random_test_df("permian basin", capacity_mean=340.0), + _generate_random_test_df("lake hubbard", capacity_mean=450.0), + _generate_random_test_df("north lake", capacity_mean=800.0), + _generate_random_test_df("stryker creek", capacity_mean=850.0), + _generate_random_test_df("sewell creek", capacity_mean=900.0), + _generate_random_test_df("southeast chicago", capacity_mean=400.0, size=3), + _generate_random_test_df("mohave", capacity_mean=500.0, size=10), + _generate_random_test_df("el segundo", capacity_mean=600.0, size=13), + _generate_random_test_df("highgrove", capacity_mean=300.0, size=23), + _generate_random_test_df("cool water"), + _generate_random_test_df("huntington beach"), + _generate_random_test_df("long beach"), + _generate_random_test_df("san onofre 2&3"), + _generate_random_test_df("allen e. kintigh", capacity_mean=150), + _generate_random_test_df("hawthorn 6", capacity_mean=150), + _generate_random_test_df("venice c.t.", capacity_mean=500), + _generate_random_test_df("keystone *", capacity_mean=1872.0, size=3), + _generate_random_test_df("keystone", capacity_mean=50.0, size=21), + _generate_random_test_df( + "keystone 1&2 (3.70%)", capacity_mean=69.3, size=5 + ), + ] + ).reset_index() + + +def test_classify_plants_ferc1(mock_ferc1_plants_df): + """Test the FERC inter-year plant linking model.""" + + @graph + def _link_ids(df: pd.DataFrame): + feature_matrix = ferc_dataframe_embedder(df) + label_df = link_ids_cross_year(df, feature_matrix) + return label_df + + mock_ferc1_plants_df["plant_id_ferc1"] = ( + _link_ids.to_job() + .execute_in_process(input_values={"df": mock_ferc1_plants_df}) + .output_value()["record_label"] + ) + + # Compute percent of records assigned correctly + correctly_matched = ( + mock_ferc1_plants_df.groupby("base_plant_name")["plant_id_ferc1"] + .apply(lambda plant_ids: plant_ids.value_counts().iloc[0]) + .sum() + ) + ratio_correct = correctly_matched / len(mock_ferc1_plants_df) + logger.info(f"Percent correctly matched: {ratio_correct*100:.2f}%") + assert ratio_correct > 0.95, "Percent of correctly matched FERC records below 85%."