-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerated_meme_edit_analysis_workflow.py
More file actions
291 lines (236 loc) · 11.9 KB
/
generated_meme_edit_analysis_workflow.py
File metadata and controls
291 lines (236 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
"""
GeneratedMemeEditAnalysisWorkflow - Intelligent editing workflow for generated memecoins
Analyzes user feedback and regenerates affected parts of generated memecoins.
Coordinates decision analysis, context retrieval, metadata regeneration, and image regeneration.
"""
import logging
from typing import Any, Dict
from src.constants import CONFIG_ROOT
from src.domain.workflow.default_workflow import DefaultWorkflow
from src.domain.input_source.memory_memecoin_input_source import MemoryMemecoinInputSource
from src.domain.processor.generated_meme_edit_processor import (
GeneratedMemeEditProcessor,
)
from src.domain.executor.generated_edit_proposal_executor import GeneratedEditProposalExecutor
from src.models.ai.processing_models import MemecoinEntry
from src.models.llm.litellm_config import LiteLLMSetup
from src.services.ai.image_generation_service import ImageGenerationService
from src.services.ai.litellm_service import LiteLLMService
from src.util.config_loader import load_config
logger = logging.getLogger(__name__)
class GeneratedMemeEditAnalysisWorkflow:
"""
Workflow for analyzing feedback and regenerating generated memecoin parts
Architecture: MemoryMemecoinInputSource → GeneratedMemeEditProcessor → GeneratedEditProposalExecutor
Follows standard 3-phase architecture with proper BaseStageProcessor integration.
Flow:
1. Accept memecoin data + user feedback
2. Create MemecoinEntry from data
3. Run through DefaultWorkflow pipeline (5-stage processor)
4. Retrieve proposal from executor
5. Return edited MemecoinEntry
Use case: GenerationOrchestrator receives edit request → runs this workflow → returns proposal
"""
def __init__(self):
"""Initialize edit analysis workflow"""
self.workflow = None
self.input_source = None
self.llm_service = None
self.image_service = None
self.executor = None
self.processing_complete = None # asyncio.Event for signaling async processing completion
self.edited_memecoin = None # Keep for backward compatibility
async def initialize(self):
"""Initialize workflow components"""
logger.info("🔧 Initializing Generated Meme Edit Analysis Workflow...")
try:
# Initialize LLM service
logger.info("🤖 Initializing LLM service...")
config_path = CONFIG_ROOT / "litellm.yaml"
config = load_config(LiteLLMSetup, str(config_path))
self.llm_service = LiteLLMService(config=config)
await self.llm_service.initialize()
logger.info("✅ LLM service initialized")
# Initialize Image Generation service
logger.info("🎨 Initializing Image Generation service...")
self.image_service = ImageGenerationService()
logger.info("✅ Image Generation service initialized")
# Initialize executor
logger.info("⚙️ Initializing GeneratedEditProposalExecutor...")
self.executor = GeneratedEditProposalExecutor()
logger.info("✅ Executor initialized")
logger.info("✅ Workflow initialization complete")
except Exception as e:
logger.error(f"❌ Workflow initialization failed: {e}")
raise RuntimeError(f"Failed to initialize workflow: {e}") from e
async def run(
self, memecoin_data: Dict[str, Any], user_feedback: str
) -> MemecoinEntry:
"""
Run edit analysis workflow with user feedback
Args:
memecoin_data: Current memecoin data dictionary
user_feedback: User's feedback about what needs changing
Returns:
MemecoinEntry with proposed edits applied
Raises:
ValueError: If memecoin_data is invalid
RuntimeError: If workflow execution fails
"""
try:
logger.info(
f"🚀 Running edit analysis for: {memecoin_data.get('token_name', 'Unknown')}"
)
logger.info(f"📝 User feedback: {user_feedback[:100]}...")
# Convert dict to MemecoinEntry
memecoin_entry = self._dict_to_memecoin_entry(memecoin_data)
# Get session_id from data (or generate one)
session_id = memecoin_data.get("session_id", "unknown")
logger.info(f"🔑 Using session_id: {session_id} for proposal caching")
# Create input source with single memecoin
# Create input source with single memecoin and session_id
self.input_source = MemoryMemecoinInputSource(
memecoin_data=memecoin_entry.model_dump(),
session_id=session_id, # Pass session_id for tracking
)
# Create processor with feedback (follows BaseStageProcessor architecture)
processor = GeneratedMemeEditProcessor(
llm_service=self.llm_service,
image_service=self.image_service,
user_feedback=user_feedback,
)
# Create completion event for async synchronization
import asyncio
self.processing_complete = asyncio.Event()
logger.info("🔔 Created completion event for async synchronization")
# Wrap executor.execute to signal completion when proposal is cached
original_execute = self.executor.execute
async def execute_with_signal(action):
"""Wrapper that signals completion after caching proposal"""
result = await original_execute(action)
logger.info(f"✅ Proposal cached, signaling completion for session: {session_id}")
self.processing_complete.set() # Signal that processing is complete
return result
self.executor.execute = execute_with_signal
# Create workflow WITH executor (proper architecture pattern)
self.workflow = DefaultWorkflow(
input_source=self.input_source,
processor=processor,
executor=self.executor, # ✅ FIXED: Now uses proper executor
)
# Initialize and run workflow
await self.workflow.initialize()
logger.info("✅ Workflow initialized, starting processing...")
# Start input source to process single memecoin (will auto-stop after processing)
await self.input_source.start_listening()
# CRITICAL FIX: Wait for async processing to complete using event synchronization
# The input source emits events asynchronously, and the processing callback
# chain doesn't block. We wait for the executor to signal completion via event.
logger.info("⏳ Waiting for async edit processing to complete...")
try:
await asyncio.wait_for(self.processing_complete.wait(), timeout=90.0)
logger.info("✅ Async processing completed successfully")
except asyncio.TimeoutError:
logger.error(f"❌ Edit processing timed out for session {session_id}")
logger.error(f"📊 Cached proposals in executor: {list(self.executor.proposals.keys())}")
raise RuntimeError(
f"Edit processing timed out after 90 seconds for session {session_id}"
)
# Retrieve edit proposal from executor (new architecture pattern)
logger.info(f"📊 Cached session_ids in executor: {list(self.executor.proposals.keys())}")
logger.info(f"🔍 Retrieving proposal for session_id: {session_id}")
proposal = self.executor.get_proposal(session_id)
if proposal:
edited_entry = proposal.edited_entry
self.edited_memecoin = edited_entry # Keep for backward compatibility
logger.info(f"✅ Edit analysis complete for: {edited_entry.token_name}")
logger.info(f" 📝 Strategy: {proposal.edit_strategy}")
logger.info(f" 📋 Affected fields: {', '.join(proposal.affected_fields)}")
logger.info(f" 🖼️ Image regenerated: {proposal.image_regenerated}")
return edited_entry
raise RuntimeError(
f"Failed to retrieve edit proposal for session {session_id} - no proposal in executor"
)
except Exception as e:
logger.error(f"❌ Edit analysis workflow failed: {e}")
raise RuntimeError(f"Edit analysis failed: {e}") from e
def _dict_to_memecoin_entry(self, data: Dict[str, Any]) -> MemecoinEntry:
"""
Convert dictionary to MemecoinEntry
Args:
data: Memecoin data dictionary
Returns:
MemecoinEntry instance
Raises:
ValueError: If required fields are missing
"""
try:
# Required fields
if not data.get("token_name"):
raise ValueError("token_name is required")
if not data.get("ticker"):
raise ValueError("ticker is required")
# Build MemecoinEntry
entry = MemecoinEntry(
token_name=data.get("token_name", ""),
ticker=data.get("ticker", ""),
description=data.get("description", ""),
tags=data.get("tags", []),
tags_categories=data.get("tags_categories", []),
image_base64=data.get("image_base64", ""),
image_mime_type=data.get("image_mime_type", "image/png"),
created_at=data.get("created_at", 0),
token_address=data.get("token_address", ""),
caption=data.get("caption", ""),
caption_structured=data.get("caption_structured"),
)
return entry
except Exception as e:
logger.error(f"❌ Failed to convert dict to MemecoinEntry: {e}")
raise ValueError(f"Invalid memecoin data: {e}") from e
def get_edited_memecoin(self) -> MemecoinEntry:
"""
Get the edited memecoin entry
Returns:
Edited MemecoinEntry
Raises:
RuntimeError: If workflow hasn't run yet
"""
if self.edited_memecoin is None:
raise RuntimeError("No edited memecoin available - workflow not run yet")
return self.edited_memecoin
def get_edit_summary(self) -> Dict[str, Any]:
"""
Get summary of the edit operation
Returns:
Dictionary with edit summary:
- strategy: Edit strategy used (minor_text_edit, visual_edit, etc.)
- image_regenerated: Whether image was regenerated
- affected_fields: List of fields that were changed
- metadata_regenerated: Whether metadata was regenerated
Raises:
RuntimeError: If workflow hasn't run yet or no proposals available
"""
if self.executor is None or not hasattr(self.executor, 'proposals'):
raise RuntimeError("No executor available - workflow not initialized")
if not self.executor.proposals:
raise RuntimeError("No edit proposals available - workflow not run yet")
# Get the most recent proposal (last one added)
session_ids = list(self.executor.proposals.keys())
latest_session_id = session_ids[-1]
proposal = self.executor.proposals[latest_session_id]
return {
"strategy": proposal.edit_strategy,
"image_regenerated": proposal.image_regenerated,
"affected_fields": proposal.affected_fields,
"metadata_regenerated": proposal.metadata_regenerated,
}
async def cleanup(self):
"""Clean up workflow resources"""
try:
if self.llm_service:
logger.debug("Cleaning up LLM service...")
# LLM service cleanup if needed
logger.info("🧹 Workflow cleanup complete")
except Exception as e:
logger.error(f"❌ Error during cleanup: {e}")