-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrag_memecoin_edit_analysis_workflow.py
More file actions
143 lines (116 loc) · 5.55 KB
/
Copy pathrag_memecoin_edit_analysis_workflow.py
File metadata and controls
143 lines (116 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
RAG Memecoin Edit Analysis Workflow - LLM-powered feedback analysis for memecoin editing.
Receives user feedback and original memecoin, returns edited proposal.
"""
import logging
from typing import Dict, Any
from src.domain.workflow.default_workflow import DefaultWorkflow
from src.domain.input_source.memory_memecoin_input_source import (
MemoryMemecoinInputSource,
)
from src.domain.processor.rag_memecoin_edit_processor import RAGMemecoinEditProcessor
from src.domain.executor.edit_proposal_executor import EditProposalExecutor
from src.services.ai.litellm_service import LiteLLMService
from src.constants import CONFIG_ROOT
from src.util.config_loader import load_config
from src.models.llm.litellm_config import LiteLLMSetup
from src.models.ai.processing_models import MemecoinEntry
logger = logging.getLogger(__name__)
class RAGMemecoinEditAnalysisWorkflow:
"""
Workflow for analyzing user feedback and generating memecoin edit proposals.
Architecture: MemoryMemecoinInputSource → RAGMemecoinEditProcessor → Returns edited entry
No executor needed - this workflow just returns the LLM-edited data.
Use case: Orchestrator receives user feedback, runs this workflow, returns proposal to API.
"""
def __init__(self):
"""Initialize edit analysis workflow"""
self.workflow = None
self.input_source = None
self.llm_service = None
self.executor = None
self.edited_memecoin = None # Keep for backward compatibility
async def initialize(self):
"""Initialize workflow components"""
logger.info("🔧 Initializing Edit Analysis Workflow...")
# Initialize LLM service
logger.info("🤖 Initializing LLM service...")
config_path = CONFIG_ROOT / "litellm.yaml"
config = load_config(LiteLLMSetup, str(config_path))
self.llm_service = LiteLLMService(config=config)
await self.llm_service.initialize()
logger.info("✅ LLM service initialized")
# Initialize executor
logger.info("⚙️ Initializing EditProposalExecutor...")
self.executor = EditProposalExecutor()
logger.info("✅ Executor initialized")
async def run(
self, memecoin_data: Dict[str, Any], user_feedback: str
) -> MemecoinEntry:
"""
Run edit analysis workflow with user feedback.
Args:
memecoin_data: Original memecoin data dictionary
user_feedback: User's free-form feedback about what needs changing
Returns:
MemecoinEntry with LLM-proposed edits applied
Raises:
ValueError: If memecoin_data is invalid
RuntimeError: If workflow execution fails
"""
try:
logger.info(
f"🚀 Running edit analysis for: {memecoin_data.get('token_name', 'Unknown')}"
)
logger.info(f"📝 User feedback: {user_feedback[:100]}...")
# Create input source with single memecoin
self.input_source = MemoryMemecoinInputSource(memecoin_data=memecoin_data)
# Create processor with user feedback
processor = RAGMemecoinEditProcessor(
llm_service=self.llm_service, user_feedback=user_feedback
)
# Create workflow WITH executor (proper architecture pattern)
self.workflow = DefaultWorkflow(
input_source=self.input_source,
processor=processor,
executor=self.executor # ✅ FIXED: Now uses proper executor
)
# Initialize and run workflow
await self.workflow.initialize()
logger.info("✅ Workflow initialized, starting processing...")
# Start input source to process single memecoin (will auto-stop after processing)
await self.input_source.start_listening()
# Retrieve edit proposal from executor (new architecture pattern)
token_address = memecoin_data.get("token_address")
if not token_address:
raise ValueError("memecoin_data must contain 'token_address' field")
proposal = self.executor.get_proposal(token_address)
if proposal:
edited_entry = proposal.edited_entry
self.edited_memecoin = edited_entry # Keep for backward compatibility
logger.info(
f"✅ Edit analysis complete for: {edited_entry.token_name}"
)
logger.info(f" 📝 Changes: {len(proposal.get_changes_summary())} field(s)")
return edited_entry
# Fallback to old method for backward compatibility
logger.warning("⚠️ No proposal found in executor, falling back to processor method")
edited_entry = processor.get_edited_memecoin()
if edited_entry:
self.edited_memecoin = edited_entry
logger.info(f"✅ Edit analysis complete (fallback): {edited_entry.token_name}")
return edited_entry
raise RuntimeError(
"Failed to retrieve edited memecoin - no proposal in executor and processor returned None"
)
except Exception as e:
logger.error(f"❌ Edit analysis workflow failed: {e}")
raise
async def cleanup(self):
"""Clean up workflow resources"""
try:
if self.workflow:
await self.workflow.stop()
logger.info("🧹 Edit analysis workflow cleaned up")
except Exception as e:
logger.error(f"❌ Error during cleanup: {e}")