-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerated_meme_edit_processor.py
More file actions
291 lines (239 loc) · 11.3 KB
/
generated_meme_edit_processor.py
File metadata and controls
291 lines (239 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
"""
GeneratedMemeEditProcessor - Orchestrates 6-stage edit workflow for generated memecoins
Follows standard 3-phase architecture: InputSource → Processor → Executor
Stage Pipeline:
1. EditDecisionStage - Analyzes feedback and determines regeneration strategy
2. EntityExtractionStage - Extracts entities from metadata + feedback
3. ImageRegenerationPlanStage - Creates detailed edit plan with entity-to-image mapping
4. ImageRegenerationStage - Regenerates image using structured edit plan
5. MetadataRegenerationStage - Regenerates text fields (conditional)
6. EditOutputStage - Creates GeneratedEditProposalAction
Outputs GeneratedEditProposalAction for GeneratedEditProposalExecutor.
"""
import logging
from typing import Optional
from src.domain.model.actions.generated_edit_proposal_action import (
GeneratedEditProposalAction,
)
from src.domain.model.events.memecoin_read_event import MemecoinReadEvent
from src.domain.processor.base_stage_processor import BaseStageProcessor
from src.domain.processor.stage_graph import StageGraph, StageNode
from src.domain.processor.stages.edit_decision_stage import EditDecisionStage
from src.domain.processor.stages.edit_output_stage import EditOutputStage
from src.domain.processor.stages.entity_extraction_stage import EntityExtractionStage
from src.domain.processor.stages.meme_edit_planning_stage import (
MemeEditPlanningStage,
)
from src.domain.processor.stages.image_regeneration_stage import (
ImageRegenerationStage,
)
from src.domain.processor.stages.metadata_regeneration_stage import (
MetadataRegenerationStage,
)
from src.models.ai.processing_models import ProcessingContext
from src.services.ai.image_generation_service import ImageGenerationService
from src.services.ai.litellm_service import LiteLLMService
from src.services.entity.entity_database_service import get_entity_database_service
logger = logging.getLogger(__name__)
class GeneratedMemeEditProcessor(
BaseStageProcessor[MemecoinReadEvent, GeneratedEditProposalAction]
):
"""
Edit processor for generated memecoins using standard 3-phase architecture
This processor orchestrates a 6-stage workflow:
**Stage 1: EditDecisionStage**
- Analyzes user feedback using LLM
- Determines regeneration strategy
- Identifies problematic entities
- Selects affected fields
**Stage 2: EntityExtractionStage**
- Extracts entities from metadata + user feedback
- Loads reference images from EntityDatabaseService
- Provides visual context for accurate regeneration
**Stage 3: ImageRegenerationPlanStage (NEW)**
- Uses LLM to analyze feedback + extracted entities
- Creates structured edit plan with entity-to-image mapping
- Generates detailed visual attribute checklists
- Prioritizes corrections by importance
- Outputs ImageEditPlan for next stage
**Stage 4: ImageRegenerationStage**
- Regenerates image using structured edit plan
- Uses enhanced prompts with explicit entity enumeration
- Strong biasing language for exact replication
- Current image as primary reference + entity refs
**Stage 5: MetadataRegenerationStage (Conditional)**
- Regenerates affected metadata fields
- Incorporates user feedback
- Maintains consistency with image
**Stage 6: EditOutputStage**
- Creates GeneratedEditProposalAction
- Determines affected fields
- Packages edit summary
**Key Features:**
- Follows BaseStageProcessor architecture
- Uses StageGraph for orchestration
- Two-stage planning + execution for precise image edits
- Entity-specific visual attribute replication
- Proper ProcessingContext data flow
- Comprehensive logging throughout pipeline
"""
def __init__(
self,
llm_service: LiteLLMService,
image_service: ImageGenerationService,
user_feedback: str,
processor_name: Optional[str] = None,
executor_router=None,
):
"""
Initialize edit processor with required services
Args:
llm_service: LLM service for AI operations
image_service: Image generation service
user_feedback: User's edit request/feedback text
processor_name: Custom processor name for logging
executor_router: Optional executor router for action routing
"""
# Store services for stage creation
self.llm_service = llm_service
self.image_service = image_service
self.user_feedback = user_feedback
# Initialize entity database service
self.entity_database_service = get_entity_database_service()
# Create the stage graph
stage_graph = self._create_stage_graph()
super().__init__(
processor_name=processor_name or "GeneratedMemeEditProcessor",
llm_service=llm_service,
stage_graph=stage_graph,
executor_router=executor_router,
)
async def process(
self, input_data: MemecoinReadEvent, action_handler: callable = None
) -> Optional[GeneratedEditProposalAction]:
"""
Process edit request through stage graph
This override populates the context with required data before stage execution:
- memecoin_entry: The memecoin to edit (from input_data)
- user_feedback: The edit request text (from self.user_feedback)
Args:
input_data: MemecoinReadEvent containing the memecoin to edit
action_handler: Optional handler for action execution
Returns:
GeneratedEditProposalAction if successful, None otherwise
"""
from src.models.ai.processing_models import StageExecutionError
if not self._validate_input(input_data):
raise StageExecutionError(
"Invalid input data", context={"input_type": type(input_data).__name__}
)
if not self._stage_graph:
raise StageExecutionError(
"No stage graph configured", context={"processor": self.processor_name}
)
# Create context with input event
context = ProcessingContext(input_data=input_data)
# CRITICAL: Populate context.results with data that stages expect
context.results["memecoin_entry"] = input_data.memecoin_entry
context.results["user_feedback"] = self.user_feedback
# Populate image_base64 for EntityExtractionStage (multimodal extraction)
if input_data.memecoin_entry.has_image:
context.results["image_base64"] = input_data.memecoin_entry.image_base64
logger.debug("📷 Populated image_base64 for multimodal entity extraction")
# Copy session_id from event source_metadata to context metadata
if "session_id" in input_data.source_metadata:
context.metadata["session_id"] = input_data.source_metadata["session_id"]
logger.info(f"📋 Extracted session_id from event: {input_data.source_metadata['session_id']}")
try:
final_context = await self._stage_graph.execute(context)
return self._extract_output(final_context)
except Exception as e:
raise StageExecutionError(
f"Processing failed in {self.processor_name}",
context={"error": str(e), "input_type": type(input_data).__name__},
) from e
def _create_stage_graph(self) -> StageGraph:
"""
Create the 6-stage graph for edit processing
Stage Order:
1. EditDecisionStage - Feedback analysis and strategy selection
2. EntityExtractionStage - Entity extraction from metadata + feedback
3. MemeEditPlanningStage - LLM-powered DUAL plan creation (image + metadata)
4. ImageRegenerationStage - Image regeneration using image plan
5. MetadataRegenerationStage - Metadata regeneration using metadata plan (receives regenerated image)
6. EditOutputStage - Final action creation
Returns:
Configured StageGraph for edit workflow
"""
# Create stage instances with proper service injection
decision_stage = EditDecisionStage(llm_service=self.llm_service)
# Entity extraction stage for identifying entities in metadata + feedback (multimodal if image present)
entity_extraction_stage = EntityExtractionStage(
llm_service=self.llm_service,
entity_database_service=self.entity_database_service,
)
# Meme edit planning stage (DUAL: image + metadata plans)
planning_stage = MemeEditPlanningStage(llm_service=self.llm_service)
# Image regeneration stage
image_stage = ImageRegenerationStage(
llm_service=self.llm_service,
image_service=self.image_service,
)
# Metadata regeneration stage
metadata_stage = MetadataRegenerationStage(llm_service=self.llm_service)
# Output stage for creating final action
output_stage = EditOutputStage()
# Create stage nodes with linear connections
nodes = [
StageNode(stage=decision_stage, next_stages=["EntityExtractionStage"]),
StageNode(
stage=entity_extraction_stage,
next_stages=["MemeEditPlanningStage"],
),
StageNode(
stage=planning_stage, next_stages=["ImageRegenerationStage"]
),
StageNode(stage=image_stage, next_stages=["MetadataRegenerationStage"]),
StageNode(stage=metadata_stage, next_stages=["EditOutputStage"]),
StageNode(
stage=output_stage,
next_stages=[], # Final stage
),
]
# Create and return stage graph
return StageGraph(nodes=nodes, entry_point="EditDecisionStage")
def _extract_output(
self, context: ProcessingContext[MemecoinReadEvent]
) -> Optional[GeneratedEditProposalAction]:
"""
Extract final GeneratedEditProposalAction from processing context
Args:
context: Final processing context after all stages complete
Returns:
GeneratedEditProposalAction if edit successful, None if terminated
"""
# Handle None context (stage graph execution failure)
if context is None:
logger.error("❌ Stage graph execution returned None context")
return None
# Check if processing was terminated
if not context.should_continue_processing:
termination_reason = context.termination_reason or "Unknown reason"
logger.info(f"🛑 Edit processing terminated: {termination_reason}")
return None
# Extract the edit proposal action from results
edit_action = context.results.get("edit_proposal_action")
if not edit_action or not isinstance(edit_action, GeneratedEditProposalAction):
logger.error(
"❌ No valid GeneratedEditProposalAction found in context after successful processing"
)
return None
logger.info(
f"✅ Edit processing complete - session: {edit_action.session_id[:8]}"
)
logger.info(f" Strategy: {edit_action.edit_strategy}")
logger.info(
f" Affected fields: {', '.join(edit_action.affected_fields) if edit_action.affected_fields else 'none'}"
)
logger.info(f" Image regenerated: {edit_action.image_regenerated}")
return edit_action