-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathmain.py
More file actions
1259 lines (1050 loc) · 48.2 KB
/
main.py
File metadata and controls
1259 lines (1050 loc) · 48.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
import json
import logging
import os
import sys
import time
import argparse
from cortex.memory_system import AgenticMemorySystem
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
load_dotenv(".env")
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
def _parse_date_range(date_str: str) -> Optional[Dict[str, str]]:
"""Parse natural language date range into RFC3339 timestamp format"""
from datetime import datetime, timedelta
import re
if not date_str:
return None
original_str = date_str.strip()
date_str_lower = date_str.lower().strip()
current_time = datetime.now().astimezone()
# RFC3339 format (direct passthrough) - check first before lowercasing
if re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', original_str):
return {"start": original_str, "end": original_str}
# Use lowercase for natural language patterns
date_str = date_str_lower
# Relative date patterns
if "yesterday" in date_str:
start = current_time - timedelta(days=1)
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1) - timedelta(microseconds=1)
return {
"start": start.isoformat(),
"end": end.isoformat()
}
if "last week" in date_str:
start = current_time - timedelta(weeks=1)
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
end = current_time
return {
"start": start.isoformat(),
"end": end.isoformat()
}
if "last month" in date_str:
start = current_time - timedelta(days=30)
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
end = current_time
return {
"start": start.isoformat(),
"end": end.isoformat()
}
# Specific date formats
# YYYY-MM format
if re.match(r'^\d{4}-\d{2}$', date_str):
year, month = date_str.split('-')
start = datetime(int(year), int(month), 1).astimezone()
# Calculate end of month
next_month = start + timedelta(days=32)
end = next_month.replace(day=1) - timedelta(microseconds=1)
return {
"start": start.isoformat(),
"end": end.isoformat()
}
# YYYY format
if re.match(r'^\d{4}$', date_str):
start = datetime(int(date_str), 1, 1).astimezone()
end = datetime(int(date_str), 12, 31, 23, 59, 59, 999999).astimezone()
return {
"start": start.isoformat(),
"end": end.isoformat()
}
logger.warning(f"Could not parse date range: {date_str}")
return None
memory_system = AgenticMemorySystem(
stm_capacity=5,
enable_smart_collections=True, # Enable for better organization of diverse content
enable_background_processing=False # Synchronous processing to see collection creation before queries
)
# for text splitting (before storing into memory)
CHUNK_SIZE = 2000 # Characters per chunk
CHUNK_OVERLAP = 200 # Overlap between chunks
class MemoryInput(BaseModel):
"""Input model for storing a memory"""
content: str = Field(..., description="The content of the memory to store")
context: Optional[str] = Field(None, description="Optional context for the memory")
tags: Optional[List[str]] = Field(None, description="Optional tags for the memory")
timestamp: Optional[str] = Field(None, description="Optional custom timestamp (RFC3339 format: 2023-01-01T12:00:00+00:00)")
user_id: Optional[str] = Field(None, description="Optional user identifier for memory segregation")
session_id: Optional[str] = Field(None, description="Optional session identifier for memory segregation")
class MemoryOutput(BaseModel):
"""Output model for a memory"""
id: str
content: str
context: Optional[str] = None
tags: Optional[List[str]] = None
keywords: Optional[List[str]] = None
timestamp: Optional[str] = None
score: Optional[float] = None
is_linked: bool = False
memory_tier: Optional[str] = None
collection_name: Optional[str] = None
category: Optional[str] = None
composite_score: Optional[float] = None
relationship_type: Optional[str] = None
relationship_strength: Optional[float] = None
relationship_reason: Optional[str] = None
class StoreResponse(BaseModel):
"""Response model for the store endpoint"""
id: str
success: bool
message: str
class RetrieveResponse(BaseModel):
"""Response model for the retrieve endpoint"""
memories: List[MemoryOutput]
count: int
def _safe_parse_tags(tags_value):
"""Safely parse tags that might be strings or lists"""
if isinstance(tags_value, list):
return tags_value
elif isinstance(tags_value, str):
try:
# Try to parse as JSON if it looks like a JSON array
if tags_value.strip().startswith('[') and tags_value.strip().endswith(']'):
return json.loads(tags_value)
else:
# Split by comma if it's a simple string
return [tag.strip() for tag in tags_value.split(',') if tag.strip()]
except json.JSONDecodeError:
# If JSON parsing fails, split by comma
return [tag.strip() for tag in tags_value.split(',') if tag.strip()]
else:
return []
def store_memory(memory: MemoryInput) -> StoreResponse:
"""
Store a new memory in the system.
The memory content is processed automatically to extract keywords,
establish relationships with existing memories, and handle potential
merging with similar content.
Returns:
StoreResponse: Contains the ID of the stored memory, success status, and message
"""
# Extract data from the request
content = memory.content
context = memory.context
tags = memory.tags or []
timestamp = memory.timestamp
user_id = memory.user_id
session_id = memory.session_id
# Prepare keyword arguments for memory storage
# add_note automatically analyzes content through DeepPreprocessor
kwargs = {}
if context:
kwargs["context"] = context
if tags:
kwargs["tags"] = tags
# Store the memory and get its ID
memory_id = memory_system.add_note(
content,
time=timestamp,
user_id=user_id,
session_id=session_id,
**kwargs
)
return StoreResponse(
id=memory_id,
success=True,
message="Memory stored successfully"
)
def retrieve_memories(
q: str,
limit: int = 5,
memory_source: str = "all", # Options: "stm", "ltm", "all"
context: Optional[str] = None,
tags: Optional[str] = None,
exclude_content: bool = False,
include_links: bool = True,
apply_postprocessing: bool = True,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
temporal_weight: Optional[float] = None,
date_range: Optional[str] = None
) -> RetrieveResponse:
"""
Retrieve memories related to the provided query.
The system uses semantic search to find the most relevant memories.
Optional filters can be applied for context and tags.
Args:
q: Search query to find related memories
limit: Maximum number of memories to return
memory_source: Which memory tiers to search ("stm", "ltm", "all")
context: Optional context for filtering and relevance
tags: Optional comma-separated tags for filtering
exclude_content: Whether to exclude content in results
include_links: Whether to include linked memories
apply_postprocessing: Whether to apply post-retrieval processing
user_id: Optional user identifier for memory segregation
session_id: Optional session identifier for memory segregation
temporal_weight: Optional temporal weighting (0.0=semantic only, 1.0=recency only, None=auto-detect)
date_range: Optional date range filter ("last week", "2023-01", "yesterday", RFC3339 timestamps)
Returns:
RetrieveResponse: Contains list of memories ordered by relevance and count
"""
# Parse tags if provided
tag_list = _parse_tags(tags)
# Create filter based on context and tags
# where_filter = _create_filter(context, tag_list)
where_filter = {}
# Parse date range if provided
parsed_date_range = None
if date_range:
parsed_date_range = _parse_date_range(date_range)
# Auto-detect temporal queries if temporal_weight not specified
if temporal_weight is None:
temporal_keywords = ["last", "recent", "latest", "yesterday", "today", "this week", "past", "ago"]
if any(keyword in q.lower() for keyword in temporal_keywords) or date_range:
temporal_weight = 0.7 # Heavy temporal weighting for temporal queries
else:
temporal_weight = 0.0 # Pure semantic search for non-temporal queries
# Search memories in specified tiers
results = memory_system.search_memory(
query=q,
limit=limit,
memory_source=memory_source,
where_filter=where_filter,
apply_postprocessing=apply_postprocessing,
context=context,
user_id=user_id,
session_id=session_id,
temporal_weight=temporal_weight,
date_range=parsed_date_range
)
# Format the results
memories = []
seen_ids = set()
# Process direct results
for result in results:
memory_id = result.get("id", "")
if not memory_id or memory_id in seen_ids:
continue
seen_ids.add(memory_id)
# Safely parse tags
tags_value = result.get("tags", [])
parsed_tags = _safe_parse_tags(tags_value)
memory_output = {
"id": memory_id,
"context": result.get("context", ""),
"tags": parsed_tags,
"timestamp": str(result.get("timestamp", "")), # Convert timestamp to string
"score": result.get("score"),
"is_linked": False,
"memory_tier": result.get("memory_tier", "unknown"),
"collection_name": result.get("collection_name", "No Collection"),
"category": result.get("category", "Uncategorized"),
"composite_score": result.get("composite_score")
}
if not exclude_content:
memory_output["content"] = result.get("content", "")
memories.append(memory_output)
# Process linked memories if requested
if include_links:
linked_memories = _process_linked_memories(
results,
seen_ids,
exclude_content,
user_id,
session_id
)
memories.extend(linked_memories)
return RetrieveResponse(
memories=memories,
count=len(memories)
)
def clear_short_term_memory(user_id: Optional[str] = None, session_id: Optional[str] = None) -> Dict[str, Any]:
"""
Clear the short-term memory for a user/session or all if not specified.
Args:
user_id: Optional user identifier
session_id: Optional session identifier
Returns:
Dict with success status and message
"""
memory_system.clear_stm(user_id, session_id)
scope = "all memories"
if user_id and session_id:
scope = f"user '{user_id}' session '{session_id}'"
elif user_id:
scope = f"user '{user_id}'"
elif session_id:
scope = f"session '{session_id}'"
return {
"success": True,
"message": f"Short-term memory cleared for {scope}"
}
def _parse_tags(tags: Optional[str]) -> Optional[List[str]]:
if not tags:
return None
try:
return tags.split(",")
except:
return None
def _create_filter(context: Optional[str], tags: Optional[List[str]]) -> Dict[str, Any]:
where_filter = {}
if context:
where_filter["context"] = {"$contains": context}
if tags:
where_filter["tags"] = {"$contains": tags}
return where_filter
def _process_linked_memories(
results: List[Dict[str, Any]],
seen_ids: set,
exclude_content: bool,
user_id: Optional[str] = None,
session_id: Optional[str] = None
) -> List[Dict[str, Any]]:
linked_memories = []
for result in results:
memory_id = result.get("id", "")
if not memory_id:
continue
links = result.get("links", {})
# Handle both dict and list formats for links
link_ids = []
if isinstance(links, dict):
link_ids = list(links.keys())
elif isinstance(links, list):
link_ids = links
# Process each linked memory
for link_id in link_ids:
if link_id in seen_ids:
continue
seen_ids.add(link_id)
linked_memory = memory_system.read(link_id, user_id, session_id)
if not linked_memory:
continue
link_output = _create_linked_memory_output(
linked_memory,
link_id,
exclude_content,
links
)
linked_memories.append(link_output)
return linked_memories
def _create_linked_memory_output(
linked_memory: Any,
link_id: str,
exclude_content: bool,
links: Dict[str, Any]
) -> Dict[str, Any]:
# Get memory attributes safely
tags_value = getattr(linked_memory, "tags", [])
parsed_tags = _safe_parse_tags(tags_value)
link_output = {
"id": link_id,
"context": getattr(linked_memory, "context", ""),
"tags": parsed_tags,
"timestamp": str(getattr(linked_memory, "timestamp", "")), # Convert timestamp to string
"score": None,
"is_linked": True,
"memory_tier": "ltm" # Linked memories typically come from LTM
}
if not exclude_content:
link_output["content"] = getattr(linked_memory, "content", "")
# Get relationship metadata if available
if isinstance(links, dict) and link_id in links:
link_data = links[link_id]
if isinstance(link_data, dict):
link_output["relationship_type"] = link_data.get("type")
link_output["relationship_strength"] = link_data.get("strength")
link_output["relationship_reason"] = link_data.get("reason")
return link_output
def compare_memory_sources(query: str, user_id: Optional[str] = None, session_id: Optional[str] = None):
"""
Compare retrieval results from different memory sources with detailed collection analytics
"""
print(f"\nQuery: '{query}'")
# Capture collection state before search
collection_snapshot = capture_collection_snapshot()
# Search in STM only
stm_results = retrieve_memories(
q=query,
limit=3,
user_id=user_id,
session_id=session_id,
memory_source="stm"
)
# Search in LTM only
ltm_results = retrieve_memories(
q=query,
limit=3,
user_id=user_id,
session_id=session_id,
memory_source="ltm"
)
# Perform detailed "all" search to capture collection-aware analytics
all_results_detailed = retrieve_memories_with_collection_analytics(
q=query,
limit=6,
user_id=user_id,
session_id=session_id,
context=None # Could be enhanced with context
)
# Combine STM/LTM for compatibility comparison
combined_memories = []
seen_ids = set()
# Add STM memories first
for memory in stm_results.memories:
combined_memories.append(memory)
seen_ids.add(memory.id)
# Add LTM memories that aren't already in the combined list
for memory in ltm_results.memories:
if memory.id not in seen_ids:
combined_memories.append(memory)
seen_ids.add(memory.id)
# Sort the combined memories by score in descending order (highest scores first)
combined_memories.sort(key=lambda x: (x.score is not None, x.score or float('-inf')), reverse=True)
# Create a RetrieveResponse with the combined results
all_results = RetrieveResponse(
memories=combined_memories,
count=len(combined_memories)
)
# Print results with collection info
print(f"STM results: {stm_results.count}")
for memory in stm_results.memories:
collection_info = memory.collection_name if hasattr(memory, 'collection_name') else 'No Collection'
print(f"- {memory.content[:100]}... (Tier: {memory.memory_tier}, Score: {memory.score}, Collection: {collection_info})")
print(f"\nLTM results: {ltm_results.count}")
for memory in ltm_results.memories:
collection_info = memory.collection_name if hasattr(memory, 'collection_name') else 'No Collection'
print(f"- {memory.content[:100]}... (Tier: {memory.memory_tier}, Score: {memory.score}, Collection: {collection_info})")
print(f"\nCollection-Aware results: {all_results_detailed['count']}")
for memory in all_results_detailed['memories']:
collection_info = memory.get('collection_name', 'No Collection')
composite_score = memory.get('composite_score', memory.get('score', 0))
score_display = f"{composite_score:.3f}" if composite_score is not None else "None"
print(f"- {memory.get('content', '')[:100]}... (Tier: {memory.get('memory_tier', 'unknown')}, Score: {score_display}, Collection: {collection_info})")
# Show collection analytics if available
if 'collection_analytics' in all_results_detailed:
analytics = all_results_detailed['collection_analytics']
print(f"\nCollection Analytics:")
print(f" Collections searched: {analytics.get('collections_searched', 0)}")
print(f" Query transformations: {analytics.get('query_transformations', 0)}")
print(f" Relevant collections: {analytics.get('relevant_collections', 0)}")
if analytics.get('collection_details'):
print(f" Collection breakdown:")
for collection_name, details in analytics['collection_details'].items():
transformed = "✓" if details.get('query_transformed') else "✗"
print(f" • {collection_name}: {details.get('results_count', 0)} results, enhanced={transformed}")
print("\n--------------------------------\n")
return {
"stm": stm_results.memories,
"ltm": ltm_results.memories,
"all": all_results.memories,
"collection_aware": all_results_detailed,
"collection_snapshot": collection_snapshot
}
def retrieve_memories_with_collection_analytics(
q: str,
limit: int = 5,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
context: Optional[str] = None
) -> Dict[str, Any]:
"""
Enhanced retrieve that captures detailed collection analytics
"""
# Direct call to memory system to get detailed results
results = memory_system.search_memory(
query=q,
limit=limit,
memory_source="all",
user_id=user_id,
session_id=session_id,
context=context
)
# Extract collection analytics from results
collection_analytics = extract_collection_analytics(results)
# Format memories with full details
formatted_memories = []
for result in results:
# Safely parse tags
tags_value = result.get("tags", [])
parsed_tags = _safe_parse_tags(tags_value)
memory_data = {
"id": result.get("id", ""),
"content": result.get("content", ""),
"context": result.get("context", ""),
"tags": parsed_tags,
"keywords": result.get("keywords", []),
"timestamp": str(result.get("timestamp", "")),
"score": result.get("score"),
"composite_score": result.get("composite_score"),
"memory_tier": result.get("memory_tier", "unknown"),
"collection_name": result.get("collection_name", "No Collection"),
"collection_similarity": result.get("collection_similarity"),
"category": result.get("category", "Uncategorized")
}
formatted_memories.append(memory_data)
return {
"memories": formatted_memories,
"count": len(formatted_memories),
"collection_analytics": collection_analytics,
"query": q,
"context": context
}
def extract_collection_analytics(results: List[Dict]) -> Dict[str, Any]:
"""Extract detailed analytics from search results"""
collections_used = set()
query_transformations = 0
relevant_collections = 0
collection_details = {}
for result in results:
collection_name = result.get("collection_name")
if collection_name and collection_name != "No Collection":
collections_used.add(collection_name)
if collection_name not in collection_details:
collection_details[collection_name] = {
"results_count": 0,
"query_transformed": bool(result.get("composite_score")), # Has composite score = was transformed
"avg_collection_similarity": 0,
"collection_similarity_sum": 0
}
details = collection_details[collection_name]
details["results_count"] += 1
if result.get("collection_similarity"):
details["collection_similarity_sum"] += result.get("collection_similarity", 0)
details["avg_collection_similarity"] = details["collection_similarity_sum"] / details["results_count"]
if result.get("composite_score"):
relevant_collections += 1
# Count unique transformations
query_transformations = sum(1 for details in collection_details.values() if details["query_transformed"])
return {
"collections_searched": len(collections_used),
"query_transformations": query_transformations,
"relevant_collections": len(set(result.get("collection_name") for result in results
if result.get("composite_score"))),
"collection_details": collection_details,
"total_results": len(results)
}
def capture_collection_snapshot() -> Dict[str, Any]:
"""Capture current state of collections for analytics"""
if not hasattr(memory_system, 'collection_manager') or not memory_system.collection_manager:
return {"collections_enabled": False}
cm = memory_system.collection_manager
snapshot = {
"collections_enabled": True,
"total_collections": len(cm.collections),
"collections": {},
"category_stats": dict(cm.category_counts),
"total_patterns": len(cm.category_counts),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
# Capture each collection's metadata
for collection_name, info in cm.collections.items():
snapshot["collections"][collection_name] = {
"memory_count": info.get("memory_count", 0),
"created_at": info.get("created_at", ""),
"description": info.get("description", "")[:200], # Truncate for storage
"query_helper": info.get("query_helper", "")[:200], # Truncate for storage
"last_updated": info.get("last_updated", "")
}
return snapshot
def extract_segments_from_file(file_path):
"""
Extract meaningful text segments from a file using RecursiveCharacterTextSplitter.
Args:
file_path: Path to the file
Returns:
List of text segments
"""
logger.info(f"Loading data from: {file_path}")
try:
if not os.path.exists(file_path):
logger.error(f"Input file not found: {file_path}")
return []
# Read file content
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
logger.warning(f"File {file_path} is empty")
return []
# Define the splitter based on Langchain's RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
length_function=len,
separators=["\n\n\n\n", "\n\n\n", "\n\n", "\n", " ", ""],
)
# Initial split by major separator (if desired, or just use the recursive splitter directly)
initial_pages = content.split("\n\n\n\n")
final_chunks = []
logger.info(f"Initial split resulted in {len(initial_pages)} major sections.")
for i, page in enumerate(initial_pages):
if len(page) > CHUNK_SIZE:
# Recursively split oversized pages
sub_chunks = splitter.split_text(page)
final_chunks.extend(sub_chunks)
logger.debug(f"Section {i+1} was larger than chunk size, split into {len(sub_chunks)} sub-chunks.")
elif page.strip(): # Add non-empty pages smaller than chunk size directly
final_chunks.append(page)
else:
logger.debug(f"Skipping empty section {i+1}.")
logger.info(f"Total chunks after recursive splitting: {len(final_chunks)}")
return final_chunks
except Exception as e:
logger.error(f"Error loading data: {e}")
return []
def save_memories_to_json(memories, filename):
"""Save memories to a JSON file"""
# Extract relevant data for each memory
serialized_memories = []
for memory in memories:
# Try different ways to get keywords (depending on memory object type)
if hasattr(memory, "keywords"):
keywords = memory.keywords
elif isinstance(memory, dict) and "keywords" in memory:
keywords = memory["keywords"]
else:
# Try to get from original memory object
memory_id = getattr(memory, "id", None)
if memory_id and memory_id in memory_system.memories:
original_memory = memory_system.memories[memory_id]
keywords = getattr(original_memory, "keywords", [])
else:
keywords = []
# Build dictionary with all relevant fields
mem_dict = {
"id": memory.id,
"content": memory.content,
"context": memory.context,
"tags": memory.tags,
"keywords": keywords,
"timestamp": memory.timestamp,
"score": memory.score,
"memory_tier": getattr(memory, "memory_tier", None)
}
serialized_memories.append(mem_dict)
# Save to file
with open(filename, 'w', encoding='utf-8') as f:
json.dump(serialized_memories, f, indent=2)
print(f"Saved {len(serialized_memories)} memories to {filename}")
def get_stm_memories():
"""Get all memories from STM"""
# Use the user_memories dict in STM to get all memories
memories = []
for user_key in memory_system.stm.user_memories:
store = memory_system.stm.user_memories[user_key]
for memory_id, memory_data in store.items():
memory_data["id"] = memory_id
memory_data["memory_tier"] = "stm"
# Convert to MemoryOutput for consistent format
memory_out = MemoryOutput(**memory_data)
memories.append(memory_out)
return memories
def display_collection_summary():
"""Display summary of created collections"""
if not hasattr(memory_system, 'collection_manager') or not memory_system.collection_manager:
print("No Smart Collections enabled")
return
collections = memory_system.collection_manager.collections
if not collections:
print("No collections created yet")
return
print(f"\n{'='*60}")
print(f"SMART COLLECTIONS SUMMARY ({len(collections)} collections)")
print(f"{'='*60}")
for i, (collection_name, info) in enumerate(collections.items(), 1):
created_at = info.get('created_at', 'Unknown')
memory_count = info.get('memory_count', 0)
description = info.get('description', 'No description')
print(f"\n{i}. Collection: '{collection_name}'")
print(f" Memories: {memory_count}")
print(f" Created: {created_at[:19] if created_at != 'Unknown' else created_at}")
print(f" Description: {description[:120]}{'...' if len(description) > 120 else ''}")
category_counts = memory_system.collection_manager.category_counts
print(f"\nCATEGORY STATISTICS:")
print(f" Total patterns tracked: {len(category_counts)}")
# Show top 10 patterns by count
sorted_patterns = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:10]
if sorted_patterns:
print(f" Top patterns:")
for pattern, count in sorted_patterns:
print(f" • {pattern}: {count} memories")
print(f"{'='*60}\n")
def get_ltm_memories(user_id=None, session_id=None):
"""
Get memories from LTM by searching with a broad query
"""
# We'll use a very generic query to get as many memories as possible
results = memory_system.search_memory(
query="", # Empty query returns everything
limit=100, # Large limit to get most memories
memory_source="ltm",
user_id=user_id,
session_id=session_id
)
memories = []
for result in results:
result["timestamp"] = str(result["timestamp"])
# Ensure keywords are included if available
if "keywords" not in result and memory_system.memories.get(result["id"]):
mem_note = memory_system.memories.get(result["id"])
result["keywords"] = getattr(mem_note, "keywords", [])
# Safely parse tags
tags_value = result.get("tags", [])
result["tags"] = _safe_parse_tags(tags_value)
# Convert to MemoryOutput for consistent format
memory_out = MemoryOutput(**result)
memories.append(memory_out)
return memories
def load_memories_from_json(stm_json_path=None, ltm_json_path=None):
"""
Load memories from pre-stored JSON files into the memory system.
Args:
stm_json_path: Path to the STM memories JSON file
ltm_json_path: Path to the LTM memories JSON file
Returns:
Tuple of (stm_memories, ltm_memories) as MemoryOutput objects
"""
stm_memories = []
ltm_memories = []
# Load STM memories if provided
if stm_json_path and os.path.exists(stm_json_path):
logger.info(f"Loading STM memories from {stm_json_path}")
try:
with open(stm_json_path, 'r', encoding='utf-8') as f:
stm_data = json.load(f)
# Convert JSON data to MemoryOutput objects
for mem_data in stm_data:
memory_out = MemoryOutput(**mem_data)
stm_memories.append(memory_out)
# Prepare metadata for STM
content = mem_data["content"]
user_id = mem_data.get("user_id")
session_id = mem_data.get("session_id")
# Create metadata dict for ShortTermMemory.add method
metadata = {
"context": mem_data.get("context", ""),
"tags": mem_data.get("tags", []),
"keywords": mem_data.get("keywords", []),
"timestamp": mem_data.get("timestamp"),
"category": mem_data.get("category", "Uncategorized"),
"links": mem_data.get("links", {}),
"retrieval_count": mem_data.get("retrieval_count", 0),
"last_accessed": mem_data.get("last_accessed"),
"evolution_history": mem_data.get("evolution_history", []),
"user_id": user_id,
"session_id": session_id
}
# Calculate embedding for content
# We need to process it with the light processor
enhanced_metadata = memory_system.light_processor.process(content, metadata)
# Add to STM using the correct method
memory_system.stm.add(
mem_data["id"],
content,
enhanced_metadata,
user_id,
session_id
)
logger.info(f"Loaded {len(stm_memories)} memories into STM")
except Exception as e:
logger.error(f"Error loading STM memories: {e}")
# Load LTM memories if provided
if ltm_json_path and os.path.exists(ltm_json_path):
logger.info(f"Loading LTM memories from {ltm_json_path}")
try:
with open(ltm_json_path, 'r', encoding='utf-8') as f:
ltm_data = json.load(f)
# Convert JSON data to MemoryOutput objects
for mem_data in ltm_data:
memory_out = MemoryOutput(**mem_data)
ltm_memories.append(memory_out)
# Add to LTM
content = mem_data["content"]
user_id = mem_data.get("user_id")
session_id = mem_data.get("session_id")
# Create metadata dict for LongTermMemory.add method
metadata = {
"context": mem_data.get("context", ""),
"tags": mem_data.get("tags", []),
"keywords": mem_data.get("keywords", []),
"timestamp": mem_data.get("timestamp"),
"category": mem_data.get("category", "Uncategorized"),
"links": mem_data.get("links", {}),
"retrieval_count": mem_data.get("retrieval_count", 0),
"last_accessed": mem_data.get("last_accessed"),
"evolution_history": mem_data.get("evolution_history", []),
"user_id": user_id,
"session_id": session_id,
"id": mem_data["id"] # Add ID to metadata for LTM
}
# Process with deep processor to get proper embeddings
enhanced_metadata = memory_system.deep_processor.process(content, metadata)
# Add to LTM
memory_system.ltm.add(
mem_data["id"],
content,
enhanced_metadata,
user_id,
session_id
)
# Also add to the memory system's dictionary for backward compatibility
note = memory_system._dict_to_memory_note(mem_data)
memory_system.memories[mem_data["id"]] = note
logger.info(f"Loaded {len(ltm_memories)} memories into LTM")
except Exception as e:
logger.error(f"Error loading LTM memories: {e}")
return stm_memories, ltm_memories
def run_single_query(query: str, limit: int = 3, user_id: str = "default", session_id: str = "default") -> None:
"""
Run a single query and display results
"""
print(f"\n=== Query: '{query}' ===")
try:
# Perform search
response = retrieve_memories(
q=query,
limit=limit,
user_id=user_id,
session_id=session_id,
memory_source="ltm"
)
if response.count == 0:
print("No results found.")
return
print(f"Found {response.count} results:")
print("-" * 80)
for i, memory in enumerate(response.memories, 1):
score_display = f"{memory.score:.3f}" if memory.score is not None else "None"
category_info = memory.category or 'No Category'
print(f"{i}. Score: {score_display} | Tier: {memory.memory_tier} | Category: {category_info}")
print(f" Content: {memory.content[:500]}{'...' if len(memory.content) > 500 else ''}")
if memory.tags:
print(f" Tags: {', '.join(memory.tags)}")
if memory.timestamp: