-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcommands.py
More file actions
411 lines (347 loc) · 14.6 KB
/
commands.py
File metadata and controls
411 lines (347 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
"""Store commands for the do/undo functionality."""
import numpy as np
import pandas as pd
from PySide6.QtCore import QModelIndex, Qt
from PySide6.QtGui import QUndoCommand
pd.set_option("future.no_silent_downcasting", True)
def _convert_dtype_with_nullable_int(series, dtype):
"""Convert a series to the specified dtype, handling nullable integers.
When converting to integer types and the series contains NaN values,
this function automatically uses pandas nullable integer types (Int64,
Int32, etc.) instead of numpy integer types which don't support NaN.
Args:
series: The pandas Series to convert
dtype: The target dtype
Returns:
The series with the appropriate dtype applied
"""
# Check if it's already a pandas nullable int type
is_pandas_nullable_int = isinstance(
dtype,
pd.Int64Dtype | pd.Int32Dtype | pd.Int16Dtype | pd.Int8Dtype,
)
if is_pandas_nullable_int:
# Keep pandas nullable integer types as is
return series.astype(dtype)
# If column has NaN and dtype is integer, use nullable Int type
if np.issubdtype(dtype, np.integer) and series.isna().any():
# Convert numpy int types to pandas nullable Int types
if dtype == np.int64:
return series.astype("Int64")
if dtype == np.int32:
return series.astype("Int32")
if dtype == np.int16:
return series.astype("Int16")
if dtype == np.int8:
return series.astype("Int8")
# Fallback for other integer types
return series.astype("Int64")
return series.astype(dtype)
class ModifyColumnCommand(QUndoCommand):
"""Command to add or remove a column in the table.
This command is used for undo/redo functionality when adding or removing
columns in a table model.
"""
def __init__(self, model, column_name, add_mode: bool = True):
"""Initialize the command for adding or removing a column.
Args:
model: The table model to modify
column_name: The name of the column to add or remove
add_mode: If True, add a column; if False, remove a column
"""
action = "Add" if add_mode else "Remove"
super().__init__(
f"{action} column {column_name} in table {model.table_type}"
)
self.model = model
self.column_name = column_name
self.add_mode = add_mode
self.old_values = None
self.position = None
if not add_mode and column_name in model._data_frame.columns:
self.position = model._data_frame.columns.get_loc(column_name)
self.old_values = model._data_frame[column_name].copy()
def redo(self):
"""Execute the command to add or remove a column.
If in add mode, adds a new column to the table.
If in remove mode, removes the specified column from the table.
"""
if self.add_mode:
position = self.model._data_frame.shape[1]
self.model.beginInsertColumns(QModelIndex(), position, position)
self.model._data_frame[self.column_name] = ""
self.model.endInsertColumns()
else:
self.position = self.model._data_frame.columns.get_loc(
self.column_name
)
self.model.beginRemoveColumns(
QModelIndex(), self.position, self.position
)
self.model._data_frame.drop(columns=self.column_name, inplace=True)
self.model.endRemoveColumns()
def undo(self):
"""Undo the command, reversing the add or remove operation.
If the original command was to add a column, this removes it.
If the original command was to remove a column, this restores it.
"""
if self.add_mode:
position = self.model._data_frame.columns.get_loc(self.column_name)
self.model.beginRemoveColumns(QModelIndex(), position, position)
self.model._data_frame.drop(columns=self.column_name, inplace=True)
self.model.endRemoveColumns()
else:
self.model.beginInsertColumns(
QModelIndex(), self.position, self.position
)
self.model._data_frame.insert(
self.position, self.column_name, self.old_values
)
self.model.endInsertColumns()
class ModifyRowCommand(QUndoCommand):
"""Command to add or remove rows in the table.
This command is used for undo/redo functionality when adding or removing
rows in a table model.
"""
def __init__(
self, model, row_indices: list[int] | int, add_mode: bool = True
):
"""Initialize the command for adding or removing rows.
Args:
model: The table model to modify
row_indices: If add_mode is True, the number of rows to add.
If add_mode is False, the indices of rows to remove.
add_mode: If True, add rows; if False, remove rows
"""
action = "Add" if add_mode else "Remove"
super().__init__(f"{action} row(s) in table {model.table_type}")
self.model = model
self.add_mode = add_mode
self.old_rows = None
self.old_ind_names = None
df = self.model._data_frame
if add_mode:
# Adding: interpret input as count of new rows
self.row_indices = self._generate_new_indices(row_indices)
else:
# Deleting: interpret input as specific index labels
self.row_indices = (
row_indices if isinstance(row_indices, list) else [row_indices]
)
self.old_rows = df.iloc[self.row_indices].copy()
self.old_ind_names = [df.index[idx] for idx in self.row_indices]
def _generate_new_indices(self, count):
"""Generate default row indices based on table type and index type."""
df = self.model._data_frame
base = 0
existing = set(df.index.astype(str))
indices = []
while len(indices) < count:
idx = f"new_{self.model.table_type}_{base}"
if idx not in existing:
indices.append(idx)
base += 1
self.old_ind_names = indices
return indices
def redo(self):
"""Execute the command to add or remove rows.
If in add mode, adds new rows to the table.
If in remove mode, removes the specified rows from the table.
"""
df = self.model._data_frame
if self.add_mode:
position = (
0 if df.empty else df.shape[0] - 1
) # insert *before* the auto-row
self.model.beginInsertRows(
QModelIndex(), position, position + len(self.row_indices) - 1
)
# save dtypes
dtypes = df.dtypes.copy()
for _i, idx in enumerate(self.row_indices):
df.loc[idx] = [np.nan] * df.shape[1]
# set dtypes
if np.any(dtypes != df.dtypes):
for col, dtype in dtypes.items():
if dtype != df.dtypes[col]:
df[col] = _convert_dtype_with_nullable_int(
df[col], dtype
)
self.model.endInsertRows()
else:
self.model.beginRemoveRows(
QModelIndex(), min(self.row_indices), max(self.row_indices)
)
df.drop(index=self.old_ind_names, inplace=True)
self.model.endRemoveRows()
def undo(self):
"""Undo the command, reversing the add or remove operation.
If the original command was to add rows, this removes them.
If the original command was to remove rows, this restores them.
"""
df = self.model._data_frame
if self.add_mode:
positions = [df.index.get_loc(idx) for idx in self.row_indices]
self.model.beginRemoveRows(
QModelIndex(), min(positions), max(positions)
)
df.drop(index=self.old_ind_names, inplace=True)
self.model.endRemoveRows()
else:
self.model.beginInsertRows(
QModelIndex(), min(self.row_indices), max(self.row_indices)
)
restore_index_order = df.index
for pos, index_name, row in zip(
self.row_indices,
self.old_ind_names,
self.old_rows.values,
strict=False,
):
restore_index_order = restore_index_order.insert(
pos, index_name
)
df.loc[index_name] = row
df.sort_index(
inplace=True,
key=lambda x: x.map(restore_index_order.get_loc),
)
self.model.endInsertRows()
class ModifyDataFrameCommand(QUndoCommand):
"""Command to modify values in a DataFrame.
This command is used for undo/redo functionality when modifying cell values
in a table model.
"""
def __init__(
self, model, changes: dict[tuple, tuple], description="Modify values"
):
"""Initialize the command for modifying DataFrame values.
Args:
model:
The table model to modify
changes:
A dictionary mapping (row_key, column_name) to (old_val, new_val)
description:
A description of the command for the undo stack
"""
super().__init__(description)
self.model = model
self.changes = changes # {(row_key, column_name): (old_val, new_val)}
def redo(self):
"""Execute the command to apply the new values."""
self._apply_changes(use_new=True)
def undo(self):
"""Undo the command to restore the old values."""
self._apply_changes(use_new=False)
def _apply_changes(self, use_new: bool):
"""Apply changes to the DataFrame.
Args:
use_new:
If True, apply the new values; if False, restore the old values
"""
df = self.model._data_frame
col_offset = 1 if self.model._has_named_index else 0
original_dtypes = df.dtypes.copy()
# Apply changes
update_vals = {
(row, col): val[1 if use_new else 0]
for (row, col), val in self.changes.items()
}
if not update_vals:
return
update_df = pd.Series(update_vals).unstack()
for col in update_df.columns:
if col in df.columns:
df[col] = df[col].astype("object")
update_df.replace({None: "Placeholder_temp"}, inplace=True)
df.update(update_df)
df.replace({"Placeholder_temp": ""}, inplace=True)
for col, dtype in original_dtypes.items():
if col not in update_df.columns:
continue
# For numeric types, convert string inputs to numbers first
is_pandas_nullable_int = isinstance(
dtype,
pd.Int64Dtype | pd.Int32Dtype | pd.Int16Dtype | pd.Int8Dtype,
)
if is_pandas_nullable_int or np.issubdtype(dtype, np.number):
df[col] = pd.to_numeric(df[col], errors="coerce")
# Convert to appropriate dtype, handling nullable integers
df[col] = _convert_dtype_with_nullable_int(df[col], dtype)
rows = [df.index.get_loc(row_key) for (row_key, _) in self.changes]
cols = [
df.columns.get_loc(col) + col_offset for (_, col) in self.changes
]
top_left = self.model.index(min(rows), min(cols))
bottom_right = self.model.index(max(rows), max(cols))
self.model.dataChanged.emit(top_left, bottom_right, [Qt.DisplayRole])
class RenameIndexCommand(QUndoCommand):
"""Command to rename an index in a DataFrame.
This command is used for undo/redo functionality when renaming row indices
in a table model.
"""
def __init__(self, model, old_index, new_index, model_index):
"""Initialize the command for renaming an index.
Args:
model: The table model to modify
old_index: The original index name
new_index: The new index name
model_index: The QModelIndex of the cell being edited
"""
super().__init__(f"Rename index {old_index} → {new_index}")
self.model = model
self.model_index = model_index
self.old_index = old_index
self.new_index = new_index
def redo(self):
"""Execute the command to rename the index."""
self._apply(self.old_index, self.new_index)
def undo(self):
"""Undo the command to restore the original index name."""
self._apply(self.new_index, self.old_index)
def _apply(self, src, dst):
"""Apply the rename operation.
Args:
src: The source index name to rename
dst: The destination index name
"""
df = self.model._data_frame
df.rename(index={src: dst}, inplace=True)
self.model.dataChanged.emit(
self.model_index, self.model_index, [Qt.DisplayRole]
)
class RenameValueCommand(QUndoCommand):
"""Command to rename values in specified columns."""
def __init__(
self, model, old_id: str, new_id: str, column_names: str | list[str]
):
super().__init__(f"Rename value {old_id} → {new_id}")
self.model = model
self.old_id = old_id
self.new_id = new_id
self.column_names = (
column_names if isinstance(column_names, list) else [column_names]
)
self.changes = {} # {(row_idx, col_name): (old_val, new_val)}
df = self.model._data_frame
for col_name in self.column_names:
mask = df[col_name].eq(self.old_id)
for row_idx in df.index[mask]:
self.changes[(row_idx, col_name)] = (self.old_id, self.new_id)
def redo(self):
self._apply_changes(use_new=True)
def undo(self):
self._apply_changes(use_new=False)
def _apply_changes(self, use_new: bool):
df = self.model._data_frame
for (row_idx, col_name), (old_val, new_val) in self.changes.items():
df.at[row_idx, col_name] = new_val if use_new else old_val
if self.changes:
rows = [df.index.get_loc(row) for (row, _) in self.changes]
cols = [df.columns.get_loc(col) + 1 for (_, col) in self.changes]
top_left = self.model.index(min(rows), min(cols))
bottom_right = self.model.index(max(rows), max(cols))
self.model.dataChanged.emit(
top_left, bottom_right, [Qt.DisplayRole, Qt.EditRole]
)
self.model.something_changed.emit(True)