|
8 | 8 | from django.apps import apps |
9 | 9 | from django.conf import settings |
10 | 10 | from django.core.management.base import BaseCommand |
11 | | -from django.db import transaction |
12 | 11 | from django.utils import timezone |
13 | 12 |
|
14 | 13 | logger = logging.getLogger(__name__) |
@@ -140,113 +139,110 @@ def handle(self, *args, **options): |
140 | 139 | else: |
141 | 140 | self.stdout.write(f" Processing {backfill_count:,} records in batches of {batch_size:,}...") |
142 | 141 |
|
143 | | - # Process in batches using bulk_create |
| 142 | + # Process records one by one and bulk insert every batch_size records |
144 | 143 | processed = 0 |
145 | | - for start in range(0, backfill_count, batch_size): |
146 | | - end = min(start + batch_size, backfill_count) |
147 | | - filtered_batch = list(records_needing_backfill[start:end]) |
148 | | - |
149 | | - if not dry_run: |
150 | | - # Create events with preserved timestamps from original instances |
151 | | - event_records = [] |
152 | | - failed_records = [] |
153 | | - for instance in filtered_batch: |
154 | | - try: |
155 | | - # Create event record with all model fields |
156 | | - event_data = {} |
157 | | - |
158 | | - # Get excluded fields for this model from pghistory configuration |
159 | | - excluded_fields = self.get_excluded_fields(model_name) |
160 | | - |
161 | | - # Copy all fields from the instance to event_data, except excluded ones |
162 | | - for field in instance._meta.fields: |
163 | | - field_name = field.name |
164 | | - if field_name not in excluded_fields: |
165 | | - field_value = getattr(instance, field_name) |
166 | | - event_data[field_name] = field_value |
167 | | - |
168 | | - # Explicitly preserve created timestamp from the original instance |
169 | | - # Only if not excluded and exists |
170 | | - if hasattr(instance, "created") and instance.created and "created" not in excluded_fields: |
171 | | - event_data["created"] = instance.created |
172 | | - # Note: We don't preserve 'updated' for Product since it's excluded |
173 | | - |
174 | | - # Add pghistory-specific fields |
175 | | - event_data.update({ |
176 | | - "pgh_label": "initial_import", |
177 | | - "pgh_obj": instance, # ForeignKey to the original object |
178 | | - "pgh_context": None, # No context for backfilled events |
179 | | - }) |
180 | | - |
181 | | - # Set pgh_created_at to current time (this is for the event creation time) |
182 | | - # The created/updated fields above contain the original instance timestamps |
183 | | - event_data["pgh_created_at"] = timezone.now() |
184 | | - |
185 | | - event_records.append(EventModel(**event_data)) |
186 | | - |
187 | | - except Exception as e: |
188 | | - failed_records.append(instance.id) |
189 | | - logger.error( |
190 | | - f"Failed to prepare event for {model_name} " |
191 | | - f"ID {instance.id}: {e}", |
192 | | - ) |
193 | | - # Continue processing other records in the batch |
| 144 | + event_records = [] |
| 145 | + failed_records = [] |
| 146 | + |
| 147 | + for instance in records_needing_backfill.iterator(): |
| 148 | + try: |
| 149 | + # Create event record with all model fields |
| 150 | + event_data = {} |
| 151 | + |
| 152 | + # Get excluded fields for this model from pghistory configuration |
| 153 | + excluded_fields = self.get_excluded_fields(model_name) |
| 154 | + |
| 155 | + # Copy all fields from the instance to event_data, except excluded ones |
| 156 | + for field in instance._meta.fields: |
| 157 | + field_name = field.name |
| 158 | + if field_name not in excluded_fields: |
| 159 | + field_value = getattr(instance, field_name) |
| 160 | + event_data[field_name] = field_value |
| 161 | + |
| 162 | + # Explicitly preserve created timestamp from the original instance |
| 163 | + # Only if not excluded and exists |
| 164 | + if hasattr(instance, "created") and instance.created and "created" not in excluded_fields: |
| 165 | + event_data["created"] = instance.created |
| 166 | + # Note: We don't preserve 'updated' for Product since it's excluded |
| 167 | + |
| 168 | + # Add pghistory-specific fields |
| 169 | + event_data.update({ |
| 170 | + "pgh_label": "initial_import", |
| 171 | + "pgh_obj": instance, # ForeignKey to the original object |
| 172 | + "pgh_context": None, # No context for backfilled events |
| 173 | + }) |
| 174 | + |
| 175 | + # Set pgh_created_at to current time (this is for the event creation time) |
| 176 | + # The created/updated fields above contain the original instance timestamps |
| 177 | + event_data["pgh_created_at"] = timezone.now() |
| 178 | + |
| 179 | + event_records.append(EventModel(**event_data)) |
| 180 | + |
| 181 | + except Exception as e: |
| 182 | + failed_records.append(instance.id) |
| 183 | + logger.error( |
| 184 | + f"Failed to prepare event for {model_name} ID {instance.id}: {e}", |
| 185 | + ) |
194 | 186 |
|
195 | | - # Bulk create all events in this batch |
196 | | - if event_records: |
| 187 | + # Bulk create when we hit batch_size records |
| 188 | + if len(event_records) >= batch_size: |
| 189 | + if not dry_run and event_records: |
197 | 190 | try: |
198 | | - with transaction.atomic(): |
199 | | - # Temporarily disable auto_now and auto_now_add for accurate timestamp preservation |
200 | | - for field in EventModel._meta.fields: |
201 | | - if hasattr(field, "auto_now"): |
202 | | - field.auto_now = False |
203 | | - if hasattr(field, "auto_now_add"): |
204 | | - field.auto_now_add = False |
205 | | - |
206 | | - EventModel.objects.bulk_create( |
207 | | - event_records, |
208 | | - batch_size=batch_size, |
| 191 | + attempted = len(event_records) |
| 192 | + created_objects = EventModel.objects.bulk_create(event_records, batch_size=batch_size) |
| 193 | + actually_created = len(created_objects) if created_objects else 0 |
| 194 | + processed += actually_created |
| 195 | + |
| 196 | + if actually_created != attempted: |
| 197 | + logger.warning( |
| 198 | + f"bulk_create for {model_name}: attempted {attempted}, " |
| 199 | + f"actually created {actually_created} ({attempted - actually_created} skipped)", |
209 | 200 | ) |
210 | | - |
211 | | - # Restore auto_now and auto_now_add settings |
212 | | - for field in EventModel._meta.fields: |
213 | | - if field.name == "created" and hasattr(field, "auto_now_add"): |
214 | | - field.auto_now_add = True |
215 | | - if field.name == "updated" and hasattr(field, "auto_now"): |
216 | | - field.auto_now = True |
217 | 201 | except Exception as e: |
218 | | - logger.error( |
219 | | - f"Failed to bulk create events for {model_name}: {e}", |
220 | | - ) |
221 | | - # Re-raise the exception instead of falling back |
| 202 | + logger.error(f"Failed to bulk create events for {model_name}: {e}") |
222 | 203 | raise |
| 204 | + elif dry_run: |
| 205 | + processed += len(event_records) |
223 | 206 |
|
224 | | - # Count only successfully processed records |
225 | | - successful_in_batch = len(event_records) |
226 | | - failed_in_batch = len(failed_records) |
227 | | - processed += successful_in_batch |
228 | | - |
229 | | - if failed_in_batch > 0: |
230 | | - self.stdout.write( |
231 | | - self.style.WARNING( |
232 | | - f" Batch {start + 1}-{end}: {successful_in_batch} successful, " |
233 | | - f"{failed_in_batch} failed (IDs: {failed_records[:5]}{'...' if failed_in_batch > 5 else ''})", |
234 | | - ), |
235 | | - ) |
| 207 | + event_records = [] # Reset for next batch |
236 | 208 |
|
237 | | - self.stdout.write( |
238 | | - f" Processed {processed:,}/{backfill_count:,} records needing backfill " |
239 | | - f"({processed / backfill_count * 100:.1f}%)", |
240 | | - ) |
| 209 | + # Progress update |
| 210 | + progress = (processed / backfill_count) * 100 |
| 211 | + self.stdout.write(f" Processed {processed:,}/{backfill_count:,} records needing backfill ({progress:.1f}%)") |
| 212 | + |
| 213 | + # Handle remaining records |
| 214 | + if event_records: |
| 215 | + if not dry_run: |
| 216 | + try: |
| 217 | + attempted = len(event_records) |
| 218 | + created_objects = EventModel.objects.bulk_create(event_records, batch_size=batch_size) |
| 219 | + actually_created = len(created_objects) if created_objects else 0 |
| 220 | + processed += actually_created |
| 221 | + |
| 222 | + if actually_created != attempted: |
| 223 | + logger.warning( |
| 224 | + f"bulk_create final batch for {model_name}: attempted {attempted}, " |
| 225 | + f"actually created {actually_created} ({attempted - actually_created} skipped)", |
| 226 | + ) |
| 227 | + except Exception as e: |
| 228 | + logger.error(f"Failed to bulk create final batch for {model_name}: {e}") |
| 229 | + raise |
| 230 | + else: |
| 231 | + processed += len(event_records) |
| 232 | + |
| 233 | + # Final progress update |
| 234 | + if backfill_count > 0: |
| 235 | + progress = (processed / backfill_count) * 100 |
| 236 | + self.stdout.write(f" Processed {processed:,}/{backfill_count:,} records needing backfill ({progress:.1f}%)") |
241 | 237 |
|
242 | 238 | total_processed += processed |
243 | 239 |
|
244 | 240 | # Show completion summary |
245 | | - if processed < backfill_count: |
| 241 | + if failed_records: |
246 | 242 | self.stdout.write( |
247 | 243 | self.style.WARNING( |
248 | | - f" ⚠ Completed {model_name}: {processed:,}/{backfill_count:,} records " |
249 | | - f"({backfill_count - processed:,} records failed and will need to be retried)", |
| 244 | + f" ⚠ Completed {model_name}: {processed:,} records processed, " |
| 245 | + f"{len(failed_records)} records failed", |
250 | 246 | ), |
251 | 247 | ) |
252 | 248 | else: |
|
0 commit comments