diff --git a/message_ix/tools/add_year/__init__.py b/message_ix/tools/add_year/__init__.py index 00d3aff29..a96c16daa 100644 --- a/message_ix/tools/add_year/__init__.py +++ b/message_ix/tools/add_year/__init__.py @@ -1,17 +1,17 @@ -# -*- coding: utf-8 -*- -"""Add model years to an existing Scenario.""" -# Sections of the code: -# -# I. Required python packages are imported -# II. Generic utilities for dataframe manipulation -# III. The main function, add_year() -# IV. Function add_year_set() for adding and modifying the sets -# V. Function add_year_par() for copying and modifying each parameter -# VI. Two utility functions, interpolate_1d() and interpolate_2d(), for -# calculating missing values +"""Add model years to an existing Scenario. + +Sections of the code: + + I. Generic utilities for dataframe manipulation. + II. The main function, add_year(). +III. Function add_year_set() for adding and modifying the sets. + IV. Function add_year_par() for copying and modifying each parameter. + V. Two utility functions, interpolate_1d() and interpolate_2d(), for + calculating missing values. +""" import logging -from typing import Literal +from typing import Literal, TypeVar import numpy as np import pandas as pd @@ -22,14 +22,13 @@ log = logging.getLogger(__name__) -# %% II) Utility functions for dataframe manupulation +InterpolationType = TypeVar("InterpolationType", float, pd.Series, pd.DataFrame) + + +# I) Utility functions for dataframe manipulation def intpol( - y1: float | pd.Series | pd.DataFrame, - y2: float | pd.Series | pd.DataFrame, - x1: int, - x2: int, - x: int, -) -> float | pd.Series | pd.DataFrame: + y1: InterpolationType, y2: InterpolationType, x1: int, x2: int, x: int +) -> InterpolationType: """Interpolate between (*x1*, *y1*) and (*x2*, *y2*) at *x*. Parameters @@ -39,6 +38,7 @@ def intpol( """ if x2 == x1 and y2 != y1: log.warning("No difference between x1 and x2, returned empty") + assert isinstance(y1, pd.Series) return pd.Series([]) elif x2 == x1 and y2 == y1: return y1 @@ -101,9 +101,7 @@ def unit_uniform(df: pd.DataFrame) -> pd.DataFrame: return df -# %% III) The main function - - +# II) The main function def add_year( sc_ref: Scenario, sc_new: Scenario, @@ -156,12 +154,11 @@ def add_year( point for interpolation (e.g., permitting the extension of a bound to 2025, when there is only one value in 2020). """ - # III.A) Adding sets and required modifications + # II.A) Adding sets and required modifications years_new = sorted([x for x in years_new if str(x) not in set(sc_ref.set("year"))]) add_year_set(sc_ref, sc_new, years_new, firstyear_new, lastyear_new, baseyear_macro) - # ------------------------------------------------------------------------- - # III.B) Adding parameters and calculating the missing values for the - # additonal years + + # II.B) Adding parameters and calculating the missing values for the additonal years par_list: list[str] if parameter in ("all", ["all"]): par_list = sorted(sc_ref.par_list()) @@ -178,7 +175,8 @@ def add_year( reg_list: list[str] if region in ("all", ["all"]): - nodes: pd.Series = sc_ref.set("node") # type: ignore + nodes = sc_ref.set("node") + assert isinstance(nodes, pd.Series) reg_list = nodes.tolist() elif isinstance(region, list): reg_list = region @@ -188,8 +186,7 @@ def add_year( ) reg_list = [] - # List of parameters to be ignored (even not copied to the new - # scenario) + # List of parameters to be ignored (even not copied to the new scenario) par_ignore = ["duration_period"] par_list = [x for x in par_list if x not in par_ignore] @@ -219,12 +216,11 @@ def add_year( cat_year_new: pd.DataFrame = sc_new.set("cat_year") firstmodelyear_new = cat_year_new.query("type_year == 'firstmodelyear'") - firstyr_new: int = ( + firstyr_new = ( int(min(cat_year_new["year"])) if firstmodelyear_new.empty else int(firstmodelyear_new["year"].item()) - ) # type: ignore - # assert isinstance(firstyear_new, int) + ) cat_year_ref: pd.DataFrame = sc_new.set("cat_year") firstmodelyear_ref = cat_year_ref.query("type_year == 'firstmodelyear'") @@ -283,8 +279,8 @@ def add_year( log.info("All required parameters were successfully added to the new scenario") -# %% Submodules needed for running the main function -# IV) Adding new years to sets +# Submodules needed for running the main function +# III) Adding new years to sets # FIXME reduce complexity 14 → ≤13 def add_year_set( # noqa: C901 sc_ref: Scenario, @@ -303,20 +299,20 @@ def add_year_set( # noqa: C901 See :meth:`add_year` for parameter descriptions. """ - # IV.A) Treatment of the additional years in the year-related sets + # III.A) Treatment of the additional years in the year-related sets # A.1. Set - year yrs_old = list(map(int, sc_ref.set("year"))) horizon_new = sorted(yrs_old + years_new) sc_new.add_set("year", [str(yr) for yr in horizon_new]) - # A.2. Set _ type_year + # A.2. Set - type_year yr_typ = sc_ref.set("type_year").tolist() sc_new.add_set("type_year", sorted(yr_typ + [str(yr) for yr in years_new])) - # A.3. Set _ cat_year + # A.3. Set - cat_year yr_cat = sc_ref.set("cat_year") - # A.4. Changing the first year if needed + # A.4. Change the first year if needed if firstyear_new: if not yr_cat.loc[yr_cat["type_year"] == "firstmodelyear"].empty: yr_cat.loc[yr_cat["type_year"] == "firstmodelyear", "year"] = firstyear_new @@ -328,8 +324,7 @@ def add_year_set( # noqa: C901 else: yr_cat.loc[len(yr_cat.index)] = ["lastmodelyear", lastyear_new] - # A.5. Changing the base year and initialization year of macro if a new - # year specified + # A.5. Change the base year and initialization year of macro if a new year specified if baseyear_macro: if not yr_cat.loc[yr_cat["type_year"] == "baseyear_macro", "year"].empty: yr_cat.loc[yr_cat["type_year"] == "baseyear_macro", "year"] = baseyear_macro @@ -352,7 +347,7 @@ def add_year_set( # noqa: C901 .reset_index(drop=True) ) - # A.6. Changing the cumulative years based on the new first model year + # A.6. Change the cumulative years based on the new first model year if "firstmodelyear" in set(yr_cat["type_year"]): firstyear_new = int(yr_cat.loc[yr_cat["type_year"] == "firstmodelyear", "year"]) yr_cat = yr_cat.drop( @@ -362,7 +357,7 @@ def add_year_set( # noqa: C901 ) sc_new.add_set("cat_year", yr_cat) - # IV.B) Copying all other sets + # III.B) Copy all other sets set_list = [s for s in sc_ref.set_list() if "year" not in s] # Sets with one index set index_list = [x for x in set_list if not isinstance(sc_ref.set(x), pd.DataFrame)] @@ -372,7 +367,6 @@ def add_year_set( # noqa: C901 sc_new.add_set(set_name, sc_ref.set(set_name).tolist()) # The rest of the sets - for set_name in [x for x in set_list if x not in index_list]: new_set = [x for x in sc_ref.idx_sets(set_name) if x not in sc_ref.set_list()] if set_name not in sc_new.set_list() and not new_set: @@ -391,7 +385,7 @@ def next_step_bigger_than_previous(x: list[int], i: int) -> bool: return x[i + 1] - x[i] > x[i] - x[i - 1] -# %% V) Adding new years to parameters +# IV) Adding new years to parameters def add_year_par( sc_ref: Scenario, sc_new: Scenario, @@ -414,7 +408,7 @@ def add_year_par( See :meth:`add_year` for parameter descriptions. """ - # V.A) Initialization and checks + # IV.A) Initialization and checks par_list_new = sc_new.par_list() idx_names = sc_ref.idx_names(parname) horizon = sorted([int(x) for x in list(set(sc_ref.set("year")))]) @@ -461,7 +455,7 @@ def add_year_par( ) return - # Sorting the data to make it ready for dataframe manipulation + # Sort the data to make it ready for dataframe manipulation sort_order = [x for x in sort_order if x in idx_names] if sort_order: par_old = par_old.sort_values(sort_order).reset_index(drop=True) @@ -479,9 +473,9 @@ def add_year_par( # A uniform "unit" for values in different years if "unit" in par_old.columns and unit_check: par_old = unit_uniform(par_old) - # --------------------------------------------------------------------------- - # V.B) Adding new years to a parameter based on time-related indexes - # V.B.1) Parameters with no time index + + # IV.B) Adding new years to a parameter based on time-related indexes + # IV.B.1) Parameters with no time index if len(year_list) == 0: sc_new.add_par(parname, par_old) sc_new.commit(parname) @@ -490,7 +484,7 @@ def add_year_par( "entries" ) - # V.B.2) Parameters with one index related to time + # IV.B.2) Parameters with one index related to time elif len(year_list) == 1: year_col = year_list[0] df = par_old.copy() @@ -508,7 +502,7 @@ def add_year_par( sc_new.commit(" ") log.info(f"Parameter {parname} copied and new years added for node(s): {nodes}") - # V.B.3) Parameters with two indexes related to time (such as 'input') + # IV.B.3) Parameters with two indexes related to time (such as 'input') elif len(year_list) == 2: year_col = "year_act" year_ref = [x for x in year_list if x != year_col][0] @@ -557,9 +551,7 @@ def add_year_par( log.info(f"Parameter {parname} copied and new years added for node(s): {nodes}") -# %% VI) Required functions - - +# V) Required functions # FIXME reduce complexity 18 → ≤13 def interpolate_1d( # noqa: C901 df: pd.DataFrame, @@ -620,7 +612,7 @@ def interpolate_1d( # noqa: C901 for yr in yrs_new: extrapol = True if yr > max(horizon) else extrapolate - # a) If this new year greater than modeled years, do extrapolation + # a) If this new year is greater than modeled years, do extrapolation if yr > max(df2_int_column_list) and extrapol: if yr == horizon_new[horizon_new.index(max(df2_int_column_list)) + 1]: year_pre = max([x for x in df2_int_column_list if x < yr]) @@ -667,7 +659,7 @@ def interpolate_1d( # noqa: C901 elif bound_extend and cond: df2[yr] = df2[year_next] - # c) Otherise, do intrapolation + # c) Otherwise, do intrapolation elif yr > min(df2_int_column_list) and yr < max(df2_int_column_list): year_pre = max([x for x in df2_int_column_list if x < yr]) year_next = min([x for x in df2_int_column_list if x > yr]) @@ -675,8 +667,6 @@ def interpolate_1d( # noqa: C901 # Extrapolate for new years if the value exists for the previous year but # not for the next years - # TODO: here is the place that should be changed if the new year should go - # to the time step before the existing one if [x for x in df2_int_column_list if x > year_next]: year_nn = min([x for x in df2_int_column_list if x > year_next]) df2[yr] = df2[yr].fillna( @@ -787,9 +777,7 @@ def i1d_genno( return result.to_dataframe().reset_index().assign(unit=result.units) -# %% VI.B) Interpolating parameters with two dimensions related to time - - +# V.B) Interpolating parameters with two dimensions related to time # FIXME reduce complexity 38 → ≤13 def interpolate_2d( # noqa: C901 df: pd.DataFrame, @@ -802,7 +790,7 @@ def interpolate_2d( # noqa: C901 value_col: str = "value", extrapolate: bool = False, extrapol_neg: float | None = None, - year_diff: list[int] | None = None, + year_diff: list[int] = [], bound_extend: bool = True, ): """Interpolate parameters with two dimensions related year. @@ -854,8 +842,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: df2_tec = df_tec.pivot_table(index=idx, columns=year_col, values="value") df2_int_column_list = [int(column) for column in df2.columns] - # ------------------------------------------------------------------------- - # First, changing the time interval for the transition period + # First, change the time interval for the transition period # (e.g., year 2010 in old R11 model transits from 5 year to 10 year) horizon_new = sorted(horizon + [x for x in yrs_new if x not in horizon]) yr_diff_new = [ @@ -864,16 +851,22 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: if next_step_bigger_than_previous(horizon_new, horizon_new.index(x)) ] - # Generating duration_period_sum matrix for masking + if year_diff and tec_list: + yr_diff_new = [x for x in yr_diff_new if x not in year_diff] + + # Generate duration_period_sum matrix for masking + dur_list = list(np.diff(horizon_new)) + dur_list.insert(0, dur_list[0]) + dur = pd.DataFrame(index=horizon_new, data=dur_list) df_dur = pd.DataFrame( index=horizon_new[:-1], columns=[str(year) for year in horizon_new[1:]] ) for i in df_dur.index.astype(str): for j in [x for x in df_dur.columns if x > i]: - df_dur.loc[i, j] = int(j) - int(i) + df_dur.loc[i, j] = int(dur.loc[(dur.index >= i) & (dur.index < j)].sum()) - # Adding data for new transition year - if yr_diff_new and tec_list and year_diff not in yr_diff_new: + # Add data for new transition year + if yr_diff_new and tec_list: yrs = [x for x in horizon if x <= yr_diff_new[0]] year_next = min([x for x in df2_int_column_list if x > yr_diff_new[0]]) df_yrs = slice_df(df2_tec, idx, year_ref, yrs, None) @@ -906,12 +899,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: d[d.isnull() & d_n.notnull()] = d_n df2.loc[df2.index.isin(d.index), :] = d - cond1 = df_dur.index <= yr_diff_new[0] - cond2 = df_dur.columns.astype(int) >= year_next - subt = yr_diff_new[0] - horizon_new[horizon_new.index(yr_diff_new[0]) - 1] - df_dur.loc[cond1, cond2] = df_dur.loc[cond1, cond2] - subt - # ------------------------------------------------------------------------- - # Second, adding year_act of new years if year_vtg is in existing years + # Second, add year_act of new years if year_vtg is in existing years for yr in yrs_new: extrapol = True if yr > max(horizon) else extrapolate @@ -942,8 +930,8 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: df2[yr] = intpol(df2[year_pre], df2[year_next], year_pre, year_next, yr) df2_t = df2.loc[df2_tec.index, :].copy() - # This part calculates the missing value if only the previous - # timestep has a value (and not the next) + # This part calculates the missing value if only the previous timestep has + # a value (and not the next) if tec_list: cond = (pd.isna(df2_t[yr])) & (~pd.isna(df2_t[year_pre])) df2_t.loc[cond, yr] = intpol( @@ -954,7 +942,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: yr, ) - # Treating technologies with phase-out in model years + # Treat technologies with phase-out in model years if [x for x in df2_int_column_list if x < year_pre]: year_pp = max([x for x in df2_int_column_list if x < year_pre]) cond3 = (pd.isna(df2_t[yr])) & (~pd.isna(df2_t[year_pre])) @@ -970,8 +958,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: df2.loc[np.isinf(df2[year_pre]), yr] = df2[year_pre] df2 = df2.reindex(sorted([column for column in df2.columns]), axis=1) - # ------------------------------------------------------------------------- - # Third, adding year_vtg of new years + # Third, add year_vtg of new years for yr in yrs_new: # a) If this new year is greater than modeled years, do extrapolation if yr > max(horizon): @@ -981,13 +968,13 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: df_pp = slice_df(df2, idx, year_ref, [year_pp], yr) df_yr: pd.DataFrame = intpol( df_pre, idx_check(df_pp, df_pre), year_pre, year_pp, yr - ) # type: ignore + ) df_yr[np.isinf(df_pre)] = df_pre # For those technolofies with one value for each year df_to_fill_nans: pd.DataFrame = intpol( df_pre, df_pp.shift(+1, axis=1), year_pre, year_pp, yr - ) # type: ignore + ) df_yr.loc[pd.isna(df_yr[yr])] = df_to_fill_nans.shift(+1, axis=1) df_yr[pd.isna(df_yr)] = df_pre @@ -1011,8 +998,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: continue if not df_yr.empty and yr not in df_yr.columns: df_yr[yr] = np.nan - # TODO: here is the place that should be changed if the - # new year should go to the time step before the existing one + # If the new year should go to the time step before if bound_extend and not df_next.empty: df_yr[yr] = df_yr[year_next] elif bound_extend and not df_pre.empty: @@ -1022,7 +1008,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: df_yr[yr] = df_yr[yr].fillna(df_yr[[year_pre, year_next]].mean(axis=1)) df_yr[np.isinf(df_pre)] = df_pre - # Creating a mask to remove extra values + # Create a mask to remove extra values df_count = pd.DataFrame( { "c_pre": df_pre.count(axis=1), @@ -1052,7 +1038,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: df2 = pd.concat([df2, df_yr]) df2 = df2.reindex(sorted(df2.columns), axis=1).sort_index() - # ------------------------------------------------------------------------- + # Forth: final masking based on technical lifetime if tec_list and not df_dur.empty: df3 = ( @@ -1083,7 +1069,7 @@ def idx_check(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: for i in df3.index: df2.loc[i, df3.loc[i, :] >= int(df3.loc[i, "lifetime"])] = np.nan - # Removing extra values from non-lifetime technologies + # Remove extra values from non-lifetime technologies for i in [x for x in df2.index if x not in df3.index]: condition = df2.loc[[i]].index.get_level_values(year_ref)[0] df2.loc[i, df2.columns > condition] = np.nan