|
| 1 | +import unittest |
| 2 | +from unittest.mock import MagicMock, patch, call |
| 3 | +import pandas as pd |
| 4 | + |
| 5 | +from pownet.optim_model import variable_func |
| 6 | + |
| 7 | + |
| 8 | +class TestVariableFunctions(unittest.TestCase): |
| 9 | + |
| 10 | + def setUp(self): |
| 11 | + """Set up common test data and mocks.""" |
| 12 | + self.timesteps = range( |
| 13 | + 3 |
| 14 | + ) # Global timestep as per user request (implicitly via usage) |
| 15 | + self.units = ["gen_A", "gen_B"] |
| 16 | + self.edges = [("node1", "node2"), ("node2", "node3")] |
| 17 | + self.step_k = 1 # Default step_k, can be overridden in specific tests |
| 18 | + |
| 19 | + # Mock Gurobi model |
| 20 | + self.mock_model = MagicMock() |
| 21 | + # This mock will be returned by model.addVars |
| 22 | + self.mock_vars_tupledict = MagicMock() |
| 23 | + self.mock_model.addVars.return_value = self.mock_vars_tupledict |
| 24 | + |
| 25 | + # Dummy capacity DataFrame for functions where its internal structure |
| 26 | + # isn't directly tested by the function itself but passed to a mocked helper. |
| 27 | + self.dummy_capacity_df = pd.DataFrame({"dummy_col": [1, 2, 3]}) |
| 28 | + |
| 29 | + # More structured capacity DataFrame for update_flow_vars |
| 30 | + # Index needs to cover t + (step_k - 1) * 24 |
| 31 | + # For step_k=1, t=0,1,2 -> index 0,1,2 |
| 32 | + # For step_k=2, t=0,1,2 -> index 24,25,26 |
| 33 | + idx = pd.RangeIndex(start=0, stop=50, step=1) # Sufficient for a few steps |
| 34 | + data_for_flow_df = { |
| 35 | + self.edges[0]: [100 + i for i in range(50)], |
| 36 | + self.edges[1]: [200 + i for i in range(50)], |
| 37 | + } |
| 38 | + self.flow_capacity_df = pd.DataFrame(data_for_flow_df, index=idx) |
| 39 | + |
| 40 | + @patch("pownet.optim_model.variable_func.GRB") |
| 41 | + @patch("pownet.optim_model.variable_func.get_capacity_value") |
| 42 | + def test_add_var_with_variable_ub(self, mock_get_capacity_value, mock_grb_module): |
| 43 | + """Test the add_var_with_variable_ub function.""" |
| 44 | + var_name = "test_var" |
| 45 | + step_k_test = 2 |
| 46 | + |
| 47 | + mock_grb_module.CONTINUOUS = "MOCK_GRB_CONTINUOUS_TYPE" |
| 48 | + |
| 49 | + # Define a side effect for mock_get_capacity_value to simulate different capacities |
| 50 | + def capacity_side_effect(t, unit, sk, df): |
| 51 | + # sk is step_k, df is capacity_df |
| 52 | + # Simple unique value based on inputs for verification |
| 53 | + if unit == self.units[0]: |
| 54 | + return 100 + t + sk |
| 55 | + elif unit == self.units[1]: |
| 56 | + return 200 + t + sk |
| 57 | + return 0 |
| 58 | + |
| 59 | + mock_get_capacity_value.side_effect = capacity_side_effect |
| 60 | + |
| 61 | + # Expected upper bounds dictionary |
| 62 | + expected_ub_dict = {} |
| 63 | + for t_val in self.timesteps: |
| 64 | + for unit_val in self.units: |
| 65 | + expected_ub_dict[(unit_val, t_val)] = capacity_side_effect( |
| 66 | + t_val, unit_val, step_k_test, self.dummy_capacity_df |
| 67 | + ) |
| 68 | + |
| 69 | + # Call the function |
| 70 | + created_vars = variable_func.add_var_with_variable_ub( |
| 71 | + model=self.mock_model, |
| 72 | + varname=var_name, |
| 73 | + timesteps=self.timesteps, |
| 74 | + step_k=step_k_test, |
| 75 | + units=self.units, |
| 76 | + capacity_df=self.dummy_capacity_df, |
| 77 | + ) |
| 78 | + |
| 79 | + # Assert model.addVars was called correctly |
| 80 | + self.mock_model.addVars.assert_called_once_with( |
| 81 | + self.units, |
| 82 | + self.timesteps, |
| 83 | + lb=0, |
| 84 | + ub=expected_ub_dict, |
| 85 | + vtype="MOCK_GRB_CONTINUOUS_TYPE", # Check if the mocked GRB type is used |
| 86 | + name=var_name, |
| 87 | + ) |
| 88 | + |
| 89 | + # Assert that the function returns what model.addVars returned |
| 90 | + self.assertEqual(created_vars, self.mock_vars_tupledict) |
| 91 | + |
| 92 | + # Verify calls to get_capacity_value |
| 93 | + expected_calls_to_get_capacity = [] |
| 94 | + for t_val in self.timesteps: |
| 95 | + for unit_val in self.units: |
| 96 | + expected_calls_to_get_capacity.append( |
| 97 | + call(t_val, unit_val, step_k_test, self.dummy_capacity_df) |
| 98 | + ) |
| 99 | + mock_get_capacity_value.assert_has_calls( |
| 100 | + expected_calls_to_get_capacity, any_order=True |
| 101 | + ) |
| 102 | + self.assertEqual( |
| 103 | + mock_get_capacity_value.call_count, len(self.units) * len(self.timesteps) |
| 104 | + ) |
| 105 | + |
| 106 | + @patch("pownet.optim_model.variable_func.get_capacity_value") |
| 107 | + @patch("pownet.optim_model.variable_func.get_unit_hour_from_varname") |
| 108 | + def test_update_var_with_variable_ub( |
| 109 | + self, mock_get_unit_hour_from_varname, mock_get_capacity_value |
| 110 | + ): |
| 111 | + """Test the update_var_with_variable_ub function.""" |
| 112 | + step_k_test = 1 |
| 113 | + |
| 114 | + # Create mock Gurobi variables |
| 115 | + mock_gvar1 = MagicMock() |
| 116 | + mock_gvar1.VarName = ( |
| 117 | + f"{variable_func.VAR_PREFIX_THERMAL_GENERATION}_{self.units[0]}[0]" |
| 118 | + ) |
| 119 | + mock_gvar1.ub = 0 # Initial ub |
| 120 | + |
| 121 | + mock_gvar2 = MagicMock() |
| 122 | + mock_gvar2.VarName = ( |
| 123 | + f"{variable_func.VAR_PREFIX_THERMAL_GENERATION}_{self.units[1]}[1]" |
| 124 | + ) |
| 125 | + mock_gvar2.ub = 0 # Initial ub |
| 126 | + |
| 127 | + # Simulate a gp.tupledict by using a dictionary of these mocks |
| 128 | + # The function iterates over .values() |
| 129 | + mock_variables_dict = { |
| 130 | + (self.units[0], 0): mock_gvar1, |
| 131 | + (self.units[1], 1): mock_gvar2, |
| 132 | + } |
| 133 | + |
| 134 | + # Configure side effect for get_unit_hour_from_varname |
| 135 | + def unit_hour_side_effect(var_name): |
| 136 | + if var_name == mock_gvar1.VarName: |
| 137 | + return self.units[0], 0 |
| 138 | + elif var_name == mock_gvar2.VarName: |
| 139 | + return self.units[1], 1 |
| 140 | + return None, None # Should not happen with controlled inputs |
| 141 | + |
| 142 | + mock_get_unit_hour_from_varname.side_effect = unit_hour_side_effect |
| 143 | + |
| 144 | + # Configure side effect for get_capacity_value |
| 145 | + # Capacity depends on unit, t, and step_k |
| 146 | + expected_capacity_gvar1 = 150 |
| 147 | + expected_capacity_gvar2 = 250 |
| 148 | + |
| 149 | + def capacity_side_effect(t, unit, sk, df): |
| 150 | + self.assertEqual(sk, step_k_test) # Check step_k is passed correctly |
| 151 | + self.assertIs(df, self.dummy_capacity_df) # Check df is passed correctly |
| 152 | + if unit == self.units[0] and t == 0: |
| 153 | + return expected_capacity_gvar1 |
| 154 | + elif unit == self.units[1] and t == 1: |
| 155 | + return expected_capacity_gvar2 |
| 156 | + return 0 # Default, should not be hit with specific var names |
| 157 | + |
| 158 | + mock_get_capacity_value.side_effect = capacity_side_effect |
| 159 | + |
| 160 | + # Call the function |
| 161 | + # Pass the .values() if the function expects an iterable of Gurobi variables |
| 162 | + # The type hint is gp.tupledict, so we pass the dict itself. |
| 163 | + variable_func.update_var_with_variable_ub( |
| 164 | + variables=mock_variables_dict, |
| 165 | + step_k=step_k_test, |
| 166 | + capacity_df=self.dummy_capacity_df, |
| 167 | + ) |
| 168 | + |
| 169 | + # Assertions |
| 170 | + # Check get_unit_hour_from_varname calls |
| 171 | + mock_get_unit_hour_from_varname.assert_any_call(mock_gvar1.VarName) |
| 172 | + mock_get_unit_hour_from_varname.assert_any_call(mock_gvar2.VarName) |
| 173 | + self.assertEqual(mock_get_unit_hour_from_varname.call_count, 2) |
| 174 | + |
| 175 | + # Check get_capacity_value calls |
| 176 | + mock_get_capacity_value.assert_any_call( |
| 177 | + 0, self.units[0], step_k_test, self.dummy_capacity_df |
| 178 | + ) |
| 179 | + mock_get_capacity_value.assert_any_call( |
| 180 | + 1, self.units[1], step_k_test, self.dummy_capacity_df |
| 181 | + ) |
| 182 | + self.assertEqual(mock_get_capacity_value.call_count, 2) |
| 183 | + |
| 184 | + # Check if variable upper bounds were updated |
| 185 | + self.assertEqual(mock_gvar1.ub, expected_capacity_gvar1) |
| 186 | + self.assertEqual(mock_gvar2.ub, expected_capacity_gvar2) |
| 187 | + |
| 188 | + @patch("pownet.optim_model.variable_func.get_edge_hour_from_varname") |
| 189 | + def test_update_flow_vars(self, mock_get_edge_hour_from_varname): |
| 190 | + """Test the update_flow_vars function.""" |
| 191 | + step_k_test = 2 # Using a different step_k to test the time indexing |
| 192 | + line_capacity_factor = 0.9 |
| 193 | + hours_per_step = 24 # As defined in the source function |
| 194 | + |
| 195 | + # Create mock Gurobi flow variables |
| 196 | + mock_flow_var1 = MagicMock() |
| 197 | + # Example VarName format, assuming some prefix like "flow_" |
| 198 | + mock_flow_var1.VarName = f"flow_{self.edges[0][0]}_{self.edges[0][1]}[0]" |
| 199 | + mock_flow_var1.ub = 0 # Initial ub |
| 200 | + |
| 201 | + mock_flow_var2 = MagicMock() |
| 202 | + mock_flow_var2.VarName = ( |
| 203 | + f"flow_{self.edges[1][0]}_{self.edges[1][1]}[2]" # t=2 for this var |
| 204 | + ) |
| 205 | + mock_flow_var2.ub = 0 |
| 206 | + |
| 207 | + mock_flow_variables_dict = { |
| 208 | + (self.edges[0], 0): mock_flow_var1, |
| 209 | + (self.edges[1], 2): mock_flow_var2, |
| 210 | + } |
| 211 | + |
| 212 | + # Configure side effect for get_edge_hour_from_varname |
| 213 | + def edge_hour_side_effect(var_name): |
| 214 | + if var_name == mock_flow_var1.VarName: |
| 215 | + return self.edges[0], 0 # (edge_tuple, time_in_step) |
| 216 | + elif var_name == mock_flow_var2.VarName: |
| 217 | + return self.edges[1], 2 |
| 218 | + return None, None |
| 219 | + |
| 220 | + mock_get_edge_hour_from_varname.side_effect = edge_hour_side_effect |
| 221 | + |
| 222 | + # Call the function |
| 223 | + variable_func.update_flow_vars( |
| 224 | + flow_variables=mock_flow_variables_dict, # Pass dict, function iterates .values() |
| 225 | + step_k=step_k_test, |
| 226 | + capacity_df=self.flow_capacity_df, |
| 227 | + line_capacity_factor=line_capacity_factor, |
| 228 | + ) |
| 229 | + |
| 230 | + # Assertions |
| 231 | + # Check get_edge_hour_from_varname calls |
| 232 | + mock_get_edge_hour_from_varname.assert_any_call(mock_flow_var1.VarName) |
| 233 | + mock_get_edge_hour_from_varname.assert_any_call(mock_flow_var2.VarName) |
| 234 | + self.assertEqual(mock_get_edge_hour_from_varname.call_count, 2) |
| 235 | + |
| 236 | + # Calculate expected capacities and UBs |
| 237 | + # For flow_var1: edge=self.edges[0], t=0 |
| 238 | + time_idx1 = 0 + (step_k_test - 1) * hours_per_step # 0 + (2-1)*24 = 24 |
| 239 | + expected_capacity1 = self.flow_capacity_df.loc[time_idx1, self.edges[0]] |
| 240 | + expected_ub1 = expected_capacity1 * line_capacity_factor |
| 241 | + |
| 242 | + # For flow_var2: edge=self.edges[1], t=2 |
| 243 | + time_idx2 = 2 + (step_k_test - 1) * hours_per_step # 2 + (2-1)*24 = 26 |
| 244 | + expected_capacity2 = self.flow_capacity_df.loc[time_idx2, self.edges[1]] |
| 245 | + expected_ub2 = expected_capacity2 * line_capacity_factor |
| 246 | + |
| 247 | + # Check if flow variable upper bounds were updated |
| 248 | + self.assertEqual(mock_flow_var1.ub, expected_ub1) |
| 249 | + self.assertEqual(mock_flow_var2.ub, expected_ub2) |
| 250 | + |
| 251 | + |
| 252 | +# This allows running the tests directly from the script |
| 253 | +if __name__ == "__main__": |
| 254 | + |
| 255 | + # Ensure the variable prefix is set for the test |
| 256 | + if not hasattr(variable_func, "VAR_PREFIX_THERMAL_GENERATION"): |
| 257 | + variable_func.VAR_PREFIX_THERMAL_GENERATION = "thermal_generation" |
| 258 | + |
| 259 | + unittest.main() |
0 commit comments