Skip to content

Commit 637211b

Browse files
committed
feat: add LLM insights automation + adjust Airflow schedule
- DAG schedule: 2h30 → 6h (optimized for free tier) - New task: generate_insights (writes to gold_ai_insights table) - Airflow → Databricks SQL Warehouse → Groq API → Gold layer - Remove GitHub Actions deploy (manual astro deploy -f strategy) - Add deployment strategy doc (pragmatic hybrid approach) - Dependencies: databricks-sql-connector, openai client - Rate limit handling + MERGE to avoid duplicates Made-with: Cursor
1 parent ffb7244 commit 637211b

5 files changed

Lines changed: 392 additions & 35 deletions

File tree

.github/workflows/deploy-airflow.yml

Lines changed: 0 additions & 27 deletions
This file was deleted.

airflow/dags/dag_reddit_scheduled.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
DataRadar — DAG agendada de ingestão do Reddit (a cada 2h30).
2+
DataRadar — DAG agendada de ingestão do Reddit (a cada 6h).
33
44
Lê a lista de subreddits da Airflow Variable `devradar_subreddits`
55
(JSON array) e executa o pipeline completo: extração → validação →
@@ -14,6 +14,7 @@
1414
- max_pages = 2 (menos requests por subreddit)
1515
- top_k_comments = 20 (menos requests de comentários)
1616
- upload_s3 = true (sempre sobe para o S3)
17+
- schedule = 6h (otimizado para free tier Astronomer)
1718
"""
1819

1920
from __future__ import annotations
@@ -27,15 +28,21 @@
2728

2829
from airflow.decorators import task
2930
from airflow.models import Variable
30-
3131
from airflow import DAG
3232

33+
# Import da task de insights (mesmo diretório)
34+
from task_generate_insights import generate_insights
35+
3336
sys.path.insert(0, "/opt/airflow/scripts")
3437

3538
logger = logging.getLogger(__name__)
3639

3740
LOCAL_DATA_DIR = Path("/opt/airflow/data")
3841
S3_BUCKET = os.getenv("DEVRADAR_S3_BUCKET", "devradar-raw")
42+
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST", "")
43+
DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN", "")
44+
DATABRICKS_WAREHOUSE_ID = os.getenv("DATABRICKS_WAREHOUSE_ID", "")
45+
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
3946

4047
REQUIRED_FIELDS = {"id", "subreddit", "title", "created_utc"}
4148

@@ -340,8 +347,8 @@ def upload_to_s3(file_info: dict) -> str:
340347
with DAG(
341348
dag_id="devradar_reddit_scheduled",
342349
default_args=DEFAULT_ARGS,
343-
description="[PROD] Extrai Reddit a cada 2h30 — subreddits via Variable",
344-
schedule=timedelta(hours=2, minutes=30),
350+
description="[PROD] Extrai Reddit a cada 6h — subreddits via Variable",
351+
schedule=timedelta(hours=6), # Era 2h30, ajustado para free tier Astronomer
345352
start_date=datetime(2025, 1, 1),
346353
catchup=False,
347354
tags=["devradar", "reddit", "scheduled", "prod"],
@@ -354,4 +361,9 @@ def upload_to_s3(file_info: dict) -> str:
354361
validated_results = validate.expand(result=raw_results)
355362
saved_results = save_local.expand(result=validated_results)
356363
comment_results = extract_and_save_comments.expand(save_result=saved_results)
357-
upload_to_s3.expand(file_info=comment_results)
364+
uploads = upload_to_s3.expand(file_info=comment_results)
365+
366+
# Nova task: gera insights via LLM e escreve em gold_ai_insights (Databricks)
367+
# Roda APÓS todos os uploads S3 (Databricks processa S3 → Silver → depois insights Gold)
368+
insights_task = generate_insights()
369+
uploads >> insights_task
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
"""Generate AI insights task for Airflow DAG - writes to gold_ai_insights table."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import logging
7+
import os
8+
import re
9+
import time
10+
from datetime import datetime
11+
12+
from airflow.decorators import task
13+
14+
logger = logging.getLogger(__name__)
15+
16+
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
17+
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST", "")
18+
DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN", "")
19+
DATABRICKS_WAREHOUSE_ID = os.getenv("DATABRICKS_WAREHOUSE_ID", "")
20+
21+
MODEL = "llama-3.1-8b-instant"
22+
DELAY_BETWEEN_CALLS = 2
23+
24+
PROMPT_TEMPLATE = """Analyze posts/comments from r/{subreddit}. Return JSON with 3 categories:
25+
- trending_tools: tools/libs/frameworks mentioned (key: "name")
26+
- pain_points: problems/frustrations discussed (key: "topic")
27+
- solutions: recommendations proposed (key: "topic")
28+
29+
Each item: name/topic (max 5 words), mentions (int), context (1 sentence in Portuguese BR).
30+
Top 3 per category. Empty array if none. ONLY valid JSON, no markdown.
31+
32+
Schema: {{"trending_tools":[{{"name":"...","mentions":N,"context":"..."}}],"pain_points":[{{"topic":"...","mentions":N,"context":"..."}}],"solutions":[{{"topic":"...","mentions":N,"context":"..."}}]}}
33+
34+
--- r/{subreddit} DATA ---
35+
{content}
36+
"""
37+
38+
39+
def _execute_databricks_query(query: str) -> list[dict]:
40+
"""Execute SQL query on Databricks and return results as dicts."""
41+
from databricks import sql
42+
43+
with sql.connect(
44+
server_hostname=DATABRICKS_HOST,
45+
http_path=f"/sql/1.0/warehouses/{DATABRICKS_WAREHOUSE_ID}",
46+
access_token=DATABRICKS_TOKEN,
47+
) as conn:
48+
with conn.cursor() as cursor:
49+
cursor.execute(query)
50+
columns = [desc[0] for desc in cursor.description]
51+
return [dict(zip(columns, row)) for row in cursor.fetchall()]
52+
53+
54+
def _get_subreddits_with_data() -> list[str]:
55+
"""Get list of subreddits that have data in Silver."""
56+
results = _execute_databricks_query(
57+
"SELECT subreddit, COUNT(*) as cnt FROM devradar_silver_posts "
58+
"GROUP BY subreddit HAVING cnt >= 3 ORDER BY cnt DESC"
59+
)
60+
return [r["subreddit"] for r in results]
61+
62+
63+
def _get_content_for_subreddit(
64+
sub: str,
65+
posts_limit: int = 15,
66+
comments_limit: int = 20,
67+
max_chars: int = 6000,
68+
) -> str:
69+
"""Fetch posts and comments content from Silver tables."""
70+
posts = _execute_databricks_query(
71+
f"SELECT title, selftext FROM devradar_silver_posts "
72+
f"WHERE subreddit = '{sub}' ORDER BY score DESC LIMIT {posts_limit}"
73+
)
74+
75+
comments = _execute_databricks_query(
76+
f"SELECT body FROM devradar_silver_comments "
77+
f"WHERE subreddit = '{sub}' ORDER BY score DESC LIMIT {comments_limit}"
78+
)
79+
80+
parts = []
81+
for p in posts:
82+
text = p.get("title", "")
83+
if p.get("selftext"):
84+
text += f" | {p['selftext'][:150]}"
85+
parts.append(text)
86+
87+
for c in comments:
88+
if c.get("body"):
89+
parts.append(c["body"][:120])
90+
91+
return "\n".join(parts)[:max_chars]
92+
93+
94+
def _call_groq(subreddit: str, content: str) -> dict | None:
95+
"""Call Groq API to generate insights."""
96+
from openai import OpenAI
97+
98+
client = OpenAI(base_url="https://api.groq.com/openai/v1", api_key=GROQ_API_KEY)
99+
prompt = PROMPT_TEMPLATE.format(subreddit=subreddit, content=content)
100+
101+
for attempt in range(3):
102+
try:
103+
resp = client.chat.completions.create(
104+
model=MODEL,
105+
messages=[{"role": "user", "content": prompt}],
106+
temperature=0.2,
107+
max_tokens=800,
108+
response_format={"type": "json_object"},
109+
)
110+
text = resp.choices[0].message.content
111+
parsed = json.loads(text)
112+
113+
if isinstance(parsed, dict) and {
114+
"trending_tools", "pain_points", "solutions"
115+
}.intersection(parsed.keys()):
116+
return parsed
117+
118+
logger.warning(f"r/{subreddit}: Estrutura inesperada, tentativa {attempt+1}")
119+
continue
120+
121+
except Exception as e:
122+
err_str = str(e)
123+
if "429" in err_str:
124+
wait = 60
125+
match = re.search(r"try again in (\d+(?:\.\d+)?)s", err_str)
126+
if match:
127+
wait = int(float(match.group(1))) + 1
128+
logger.warning(f"r/{subreddit}: Rate limit, aguardando {wait}s...")
129+
time.sleep(wait)
130+
continue
131+
132+
logger.error(f"r/{subreddit}: Erro Groq - {err_str[:150]}")
133+
return None
134+
135+
return None
136+
137+
138+
def _write_insights_to_gold(subreddit: str, insights: dict, execution_date: str) -> None:
139+
"""Write insights to gold_ai_insights table."""
140+
from databricks import sql
141+
142+
rows = []
143+
for insight_type in ["trending_tools", "pain_points", "solutions"]:
144+
items = insights.get(insight_type, [])
145+
for item in items:
146+
if insight_type == "trending_tools":
147+
item_name = item.get("name", "")
148+
else:
149+
item_name = item.get("topic", "")
150+
151+
rows.append((
152+
subreddit,
153+
insight_type,
154+
item_name,
155+
item.get("mentions", 0),
156+
item.get("context", ""),
157+
datetime.now(),
158+
execution_date,
159+
MODEL,
160+
))
161+
162+
if not rows:
163+
logger.warning(f"r/{subreddit}: Nenhum insight para inserir")
164+
return
165+
166+
with sql.connect(
167+
server_hostname=DATABRICKS_HOST,
168+
http_path=f"/sql/1.0/warehouses/{DATABRICKS_WAREHOUSE_ID}",
169+
access_token=DATABRICKS_TOKEN,
170+
) as conn:
171+
with conn.cursor() as cursor:
172+
# MERGE para evitar duplicatas
173+
cursor.executemany(
174+
"""
175+
MERGE INTO gold_ai_insights AS target
176+
USING (VALUES (?, ?, ?, ?, ?, ?, ?, ?)) AS source(
177+
subreddit, insight_type, item_name, mentions, context,
178+
generated_at, execution_date, model_version
179+
)
180+
ON target.subreddit = source.subreddit
181+
AND target.insight_type = source.insight_type
182+
AND target.item_name = source.item_name
183+
AND target.execution_date = source.execution_date
184+
WHEN MATCHED THEN UPDATE SET *
185+
WHEN NOT MATCHED THEN INSERT *
186+
""",
187+
rows
188+
)
189+
190+
logger.info(f"r/{subreddit}: {len(rows)} insights inseridos na tabela Gold")
191+
192+
193+
@task
194+
def generate_insights(**context) -> dict:
195+
"""Generate AI insights for all subreddits and write to gold_ai_insights table."""
196+
execution_date = context["ds"]
197+
198+
if not GROQ_API_KEY:
199+
logger.error("GROQ_API_KEY não configurada - pulando geração de insights")
200+
return {"status": "skipped", "reason": "missing_groq_key"}
201+
202+
if not all([DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_WAREHOUSE_ID]):
203+
logger.error("Databricks credentials não configuradas - pulando insights")
204+
return {"status": "skipped", "reason": "missing_databricks_creds"}
205+
206+
logger.info("Buscando subreddits com dados no Databricks...")
207+
subreddits = _get_subreddits_with_data()
208+
logger.info(f"Encontrados {len(subreddits)} subreddits para processar")
209+
210+
processed = 0
211+
errors = 0
212+
213+
for i, sub in enumerate(subreddits, 1):
214+
logger.info(f"[{i}/{len(subreddits)}] Processando r/{sub}...")
215+
216+
content = _get_content_for_subreddit(sub)
217+
if len(content) < 100:
218+
logger.warning(f"r/{sub}: Pouco conteúdo, pulando")
219+
continue
220+
221+
insights = _call_groq(sub, content)
222+
if insights:
223+
_write_insights_to_gold(sub, insights, execution_date)
224+
processed += 1
225+
226+
t = len(insights.get("trending_tools", []))
227+
p = len(insights.get("pain_points", []))
228+
s = len(insights.get("solutions", []))
229+
logger.info(f"r/{sub}: OK ({t}t {p}p {s}s)")
230+
else:
231+
errors += 1
232+
logger.error(f"r/{sub}: Falha ao gerar insights")
233+
234+
if i < len(subreddits):
235+
time.sleep(DELAY_BETWEEN_CALLS)
236+
237+
logger.info(
238+
f"Geração de insights completa: {processed} sucesso, {errors} erros"
239+
)
240+
241+
return {
242+
"status": "completed",
243+
"processed": processed,
244+
"errors": errors,
245+
"total": len(subreddits),
246+
}

airflow/requirements.txt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1-
requests>=2.31,<3
2-
boto3>=1.34,<2
3-
apache-airflow-providers-amazon>=8.0,<9
1+
# Airflow Requirements
2+
# Instalar no Astronomer deployment
3+
4+
apache-airflow==2.10.4
5+
boto3>=1.35.98
6+
openai>=1.59.7
7+
databricks-sql-connector>=3.6.0

0 commit comments

Comments
 (0)