-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathpreprocessor.py
More file actions
134 lines (105 loc) · 4.4 KB
/
preprocessor.py
File metadata and controls
134 lines (105 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
Pinecone preprocess function for boost_mailing_list_tracker.
Guideline source:
- docs/Pinecone_preprocess_guideline.md
This module returns whole-document payloads (is_chunked=False) so the sync
pipeline can apply its configured chunking strategy.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from django.db.models import Q
from boost_mailing_list_tracker.models import MailingListMessage
from cppa_user_tracker.services import get_mailing_list_profiles_by_ids
def _normalize_failed_ids(failed_ids: list[str]) -> list[str]:
"""Return stripped, non-empty, de-duplicated failed IDs preserving order."""
seen: set[str] = set()
out: list[str] = []
for raw in failed_ids:
value = (raw or "").strip()
if not value or value in seen:
continue
seen.add(value)
out.append(value)
return out
def _get_sender_display_name(sender: Any) -> str:
"""Return trimmed display name from sender (display_name or identity.display_name)."""
return (getattr(sender, "display_name", "") or "").strip() or (
getattr(getattr(sender, "identity", None), "display_name", "") or ""
).strip()
def _build_document_content(message: MailingListMessage, sender: Any) -> str:
"""
Build plain-text content for embedding.
Keep this stable and readable so future chunking preserves useful context.
"""
parts: list[str] = []
if message.subject:
parts.append(f"Subject: {message.subject.strip()}")
sender_name = _get_sender_display_name(sender)
if sender_name:
parts.append(f"Sender: {sender_name}")
if message.list_name:
parts.append(f"List: {message.list_name}")
if message.sent_at:
parts.append(f"Sent At: {message.sent_at.isoformat()}")
body = (message.content or "").strip()
if body:
parts.append("")
parts.append(body)
return "\n".join(parts).strip()
def preprocess_mailing_list_for_pinecone(
failed_ids: list[str],
final_sync_at: datetime | None,
) -> tuple[list[dict[str, Any]], bool]:
"""
Build Pinecone sync documents for mailing-list messages.
Args:
failed_ids: Previous-run failed source IDs (we store/retry msg_id values).
final_sync_at: Last sync timestamp for incremental sync; None means first sync.
Returns:
(documents, is_chunked)
- documents: list[{"content": str, "metadata": dict}]
- is_chunked: False (whole docs; ingestion pipeline may chunk later)
"""
normalized_failed = _normalize_failed_ids(failed_ids or [])
queryset = MailingListMessage._default_manager.all()
if final_sync_at is None and not normalized_failed:
candidates = list(queryset.order_by("id"))
else:
criteria = Q()
if final_sync_at is not None:
# created_at tracks DB ingestion time, which is safer for incremental sync
# than sent_at (historical imports can have old sent_at values).
criteria |= Q(created_at__gt=final_sync_at)
if normalized_failed:
criteria |= Q(msg_id__in=normalized_failed)
candidates = list(queryset.filter(criteria).order_by("id"))
profile_ids = [m.sender_profile_id for m in candidates]
profiles_by_id = get_mailing_list_profiles_by_ids(profile_ids)
docs: list[dict[str, Any]] = []
seen_msg_ids: set[str] = set()
for message in candidates:
msg_id = (message.msg_id or "").strip()
if not msg_id or msg_id in seen_msg_ids:
continue
seen_msg_ids.add(msg_id)
sender = profiles_by_id.get(message.sender_profile_id)
content = _build_document_content(message, sender)
if not content:
# Skip unusable empty docs; pipeline also validates chunks.
continue
sender_name = _get_sender_display_name(sender)
safe_timestamp = int(message.sent_at.timestamp()) if message.sent_at else 0
metadata: dict[str, Any] = {
"doc_id": msg_id,
"type": "mailing",
"thread_id": message.thread_id or "",
"subject": message.subject or "",
"author": sender_name,
"timestamp": safe_timestamp,
"parent_id": message.parent_id or "",
"source_ids": str(message.pk),
"list_name": message.list_name or "",
}
docs.append({"content": content, "metadata": metadata})
return docs, False