Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a7eb168
feat: define mem-read schedular message&consumer; add async mem-reade…
CaralHsi Sep 28, 2025
8a24ec7
feat: add fast/fine mode in mem-reader;
CaralHsi Sep 28, 2025
81915a3
feat: add mem-reader in scheduler
CaralHsi Sep 29, 2025
e875ca0
fix: conflict
CaralHsi Oct 13, 2025
b5086e7
feat: change async remove
CaralHsi Oct 13, 2025
b43a9ff
Merge branch 'dev' of github.com:MemTensor/MemOS into feat/Async-add
CaralHsi Oct 14, 2025
0b2649d
feat: modify async-add in core.py
CaralHsi Oct 14, 2025
9dd632f
feat: add 'remove and refresh memory in schedular'
CaralHsi Oct 14, 2025
8c97058
feat: add naive fast mode in mem-reader
CaralHsi Oct 14, 2025
3e08a82
feat: finish fast mode in mem-reader
CaralHsi Oct 15, 2025
37fcff8
feat: add token-based window splitting and concurrency improvements
CaralHsi Oct 16, 2025
5f7e8e0
feat: add split chunker into mode in simple struct mem reader
CaralHsi Oct 16, 2025
2355527
feat: update async-mode add
CaralHsi Oct 16, 2025
2ee4c4c
chore: update gitignore
CaralHsi Oct 16, 2025
593faa5
feat: improve database note write performance
CaralHsi Oct 17, 2025
8d2263a
feat: fix mem-read scheduler
CaralHsi Oct 20, 2025
e250ab8
fix: nebula group-by bug
CaralHsi Oct 20, 2025
14e986e
fix: bug in adding mem scheduler
CaralHsi Oct 20, 2025
1609703
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 21, 2025
31adec0
fix: nebula index; mem-reader chat-time;
CaralHsi Oct 21, 2025
8628a37
fix: conflict
CaralHsi Oct 21, 2025
18f3cc8
format: searcher
CaralHsi Oct 21, 2025
4e0133e
fix: some bug in shceduler and mem-reader
CaralHsi Oct 21, 2025
6653bea
feat: add mem-organize in scheduler
CaralHsi Oct 21, 2025
af5c940
feat: add tree.mode to config; modify scheduler config
CaralHsi Oct 21, 2025
0c6e68b
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 21, 2025
28a20e9
fix: test bug
CaralHsi Oct 21, 2025
b29aa02
Merge branch 'feat/Async-add' of github.com:CaralHsi/MemOSRealPublic …
CaralHsi Oct 21, 2025
cf409fb
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ evaluation/.env
!evaluation/configs-example/*.json
evaluation/configs/*
**tree_textual_memory_locomo**
**script.py**
.env
evaluation/scripts/personamem

Expand Down
400 changes: 381 additions & 19 deletions examples/mem_reader/reader.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/memos/chunkers/sentence_chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, config: SentenceChunkerConfig):
)
logger.info(f"Initialized SentenceChunker with config: {config}")

def chunk(self, text: str) -> list[Chunk]:
def chunk(self, text: str) -> list[str] | list[Chunk]:
"""Chunk the given text into smaller chunks based on sentences."""
chonkie_chunks = self.chunker.chunk(text)

Expand Down
2 changes: 0 additions & 2 deletions src/memos/configs/mem_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ class BaseSchedulerConfig(BaseConfig):
thread_pool_max_workers: int = Field(
default=DEFAULT_THREAD_POOL_MAX_WORKERS,
gt=1,
lt=20,
description=f"Maximum worker threads in pool (default: {DEFAULT_THREAD_POOL_MAX_WORKERS})",
)
consume_interval_seconds: float = Field(
default=DEFAULT_CONSUME_INTERVAL_SECONDS,
gt=0,
le=60,
description=f"Interval for consuming messages from queue in seconds (default: {DEFAULT_CONSUME_INTERVAL_SECONDS})",
)
auth_config_path: str | None = Field(
Expand Down
5 changes: 5 additions & 0 deletions src/memos/configs/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ class TreeTextMemoryConfig(BaseTextMemoryConfig):
),
)

mode: str | None = Field(
default="sync",
description=("whether use asynchronous mode in memory add"),
)


class SimpleTreeTextMemoryConfig(TreeTextMemoryConfig):
"""Simple tree text memory configuration class."""
Expand Down
39 changes: 23 additions & 16 deletions src/memos/graph_dbs/nebular.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,20 +440,22 @@ def remove_oldest_memory(
memory_type (str): Memory type (e.g., 'WorkingMemory', 'LongTermMemory').
keep_latest (int): Number of latest WorkingMemory entries to keep.
"""
optional_condition = ""

user_name = user_name if user_name else self.config.user_name

optional_condition = f"AND n.user_name = '{user_name}'"
query = f"""
MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)
WHERE n.memory_type = '{memory_type}'
{optional_condition}
ORDER BY n.updated_at DESC
OFFSET {int(keep_latest)}
DETACH DELETE n
"""
self.execute_query(query)
try:
user_name = user_name if user_name else self.config.user_name
optional_condition = f"AND n.user_name = '{user_name}'"
count = self.count_nodes(memory_type, user_name)
if count > keep_latest:
delete_query = f"""
MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)
WHERE n.memory_type = '{memory_type}'
{optional_condition}
ORDER BY n.updated_at DESC
OFFSET {int(keep_latest)}
DETACH DELETE n
"""
self.execute_query(delete_query)
except Exception as e:
logger.warning(f"Delete old mem error: {e}")

@timed
def add_node(
Expand Down Expand Up @@ -1175,7 +1177,6 @@ def get_grouped_counts(
MATCH (n /*+ INDEX(idx_memory_user_name) */)
{where_clause}
RETURN {", ".join(return_fields)}, COUNT(n) AS count
GROUP BY {", ".join(group_by_fields)}
"""
result = self.execute_query(gql) # Pure GQL string execution

Expand Down Expand Up @@ -1620,7 +1621,13 @@ def _create_basic_property_indexes(self) -> None:
Create standard B-tree indexes on user_name when use Shared Database
Multi-Tenant Mode.
"""
fields = ["status", "memory_type", "created_at", "updated_at", "user_name"]
fields = [
"status",
"memory_type",
"created_at",
"updated_at",
"user_name",
]

for field in fields:
index_name = f"idx_memory_{field}"
Expand Down
2 changes: 1 addition & 1 deletion src/memos/graph_dbs/neo4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def search_by_embedding(
vector (list[float]): The embedding vector representing query semantics.
top_k (int): Number of top similar nodes to retrieve.
scope (str, optional): Memory type filter (e.g., 'WorkingMemory', 'LongTermMemory').
status (str, optional): Node status filter (e.g., 'active', 'archived').
status (str, optional): Node status filter (e.g., 'activated', 'archived').
If provided, restricts results to nodes with matching status.
threshold (float, optional): Minimum similarity score threshold (0 ~ 1).
search_filter (dict, optional): Additional metadata filters for search results.
Expand Down
46 changes: 38 additions & 8 deletions src/memos/mem_os/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from memos.mem_scheduler.schemas.general_schemas import (
ADD_LABEL,
ANSWER_LABEL,
MEM_READ_LABEL,
QUERY_LABEL,
)
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(self, config: MOSConfig, user_manager: UserManager | None = None):
if self.enable_mem_scheduler:
self._mem_scheduler = self._initialize_mem_scheduler()
self._mem_scheduler.mem_cubes = self.mem_cubes
self._mem_scheduler.mem_reader = self.mem_reader
else:
self._mem_scheduler: GeneralScheduler = None

Expand Down Expand Up @@ -681,6 +683,12 @@ def add(
logger.info(
f"time add: get mem_cube_id check in mem_cubes time user_id: {target_user_id} time is: {time.time() - time_start_0}"
)
sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode
if sync_mode == "async":
assert self.mem_scheduler is not None, (
"Mem-Scheduler must be working when use asynchronous memory adding."
)
logger.debug(f"Mem-reader mode is: {sync_mode}")
time_start_1 = time.time()
if (
(messages is not None)
Expand All @@ -690,6 +698,7 @@ def add(
logger.info(
f"time add: messages is not None and enable_textual_memory and text_mem is not None time user_id: {target_user_id} time is: {time.time() - time_start_1}"
)

if self.mem_cubes[mem_cube_id].config.text_mem.backend != "tree_text":
add_memory = []
metadata = TextualMemoryMetadata(
Expand All @@ -707,21 +716,30 @@ def add(
messages_list,
type="chat",
info={"user_id": target_user_id, "session_id": target_session_id},
mode="fast" if sync_mode == "async" else "fine",
)
logger.info(
f"time add: get mem_reader time user_id: {target_user_id} time is: {time.time() - time_start_2}"
)
mem_ids = []
for mem in memories:
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
mem_ids.extend(mem_id_list)
logger.info(
f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}"
)

memories_flatten = [m for m_list in memories for m in m_list]
mem_ids: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(memories_flatten)
logger.info(
f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_ids}"
)
# submit messages for scheduler
if self.enable_mem_scheduler and self.mem_scheduler is not None:
mem_cube = self.mem_cubes[mem_cube_id]
if sync_mode == "async":
message_item = ScheduleMessageItem(
user_id=target_user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
label=MEM_READ_LABEL,
content=json.dumps(mem_ids),
timestamp=datetime.utcnow(),
)
self.mem_scheduler.submit_messages(messages=[message_item])

message_item = ScheduleMessageItem(
user_id=target_user_id,
mem_cube_id=mem_cube_id,
Expand Down Expand Up @@ -749,10 +767,12 @@ def add(
messages_list = [
[{"role": "user", "content": memory_content}]
] # for only user-str input and convert message

memories = self.mem_reader.get_memory(
messages_list,
type="chat",
info={"user_id": target_user_id, "session_id": target_session_id},
mode="fast" if sync_mode == "async" else "fine",
)

mem_ids = []
Expand All @@ -766,6 +786,16 @@ def add(
# submit messages for scheduler
if self.enable_mem_scheduler and self.mem_scheduler is not None:
mem_cube = self.mem_cubes[mem_cube_id]
if sync_mode == "async":
message_item = ScheduleMessageItem(
user_id=target_user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
label=MEM_READ_LABEL,
content=json.dumps(mem_ids),
timestamp=datetime.utcnow(),
)
self.mem_scheduler.submit_messages(messages=[message_item])
message_item = ScheduleMessageItem(
user_id=target_user_id,
mem_cube_id=mem_cube_id,
Expand Down
9 changes: 8 additions & 1 deletion src/memos/mem_reader/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]:

@abstractmethod
def get_memory(
self, scene_data: list, type: str, info: dict[str, Any]
self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fast"
) -> list[list[TextualMemoryItem]]:
"""Various types of memories extracted from scene_data"""

@abstractmethod
def transform_memreader(self, data: dict) -> list[TextualMemoryItem]:
"""Transform the memory data into a list of TextualMemoryItem objects."""

@abstractmethod
def fine_transfer_simple_mem(
self, input_memories: list[list[TextualMemoryItem]], type: str
) -> list[list[TextualMemoryItem]]:
"""Fine Transform TextualMemoryItem List into another list of
TextualMemoryItem objects via calling llm to better understand users."""
Loading