Skip to content

Commit 2a7a5ce

Browse files
authored
fix: Fix the issue where scheduled tasks are not using Redis problem (#1267)
* fix: add filter of nolike && MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS * fix: create_config null * fix: add use_redis_queue config * fix: add related_id
1 parent 87170e3 commit 2a7a5ce

File tree

4 files changed

+40
-3
lines changed

4 files changed

+40
-3
lines changed

src/memos/api/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ def get_scheduler_config() -> dict[str, Any]:
854854
),
855855
"context_window_size": int(os.getenv("MOS_SCHEDULER_CONTEXT_WINDOW_SIZE", "5")),
856856
"thread_pool_max_workers": int(
857-
os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "10000")
857+
os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "200")
858858
),
859859
"consume_interval_seconds": float(
860860
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "0.01")
@@ -867,6 +867,8 @@ def get_scheduler_config() -> dict[str, Any]:
867867
"MOS_SCHEDULER_ENABLE_ACTIVATION_MEMORY", "false"
868868
).lower()
869869
== "true",
870+
"use_redis_queue": os.getenv("MEMSCHEDULER_USE_REDIS_QUEUE", "False").lower()
871+
== "true",
870872
},
871873
}
872874

src/memos/api/handlers/component_init.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ def init_server() -> dict[str, Any]:
255255
# Initialize Scheduler
256256
scheduler_config_dict = APIConfig.get_scheduler_config()
257257
scheduler_config = SchedulerConfigFactory(
258-
backend="optimized_scheduler", config=scheduler_config_dict
258+
backend=scheduler_config_dict["backend"],
259+
config=scheduler_config_dict["config"],
259260
)
260261
mem_scheduler: OptimizedScheduler = SchedulerFactory.from_config(scheduler_config)
261262
mem_scheduler.initialize_modules(

src/memos/configs/mem_scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ def validate_backend(cls, backend: str) -> str:
155155
@model_validator(mode="after")
156156
def create_config(self) -> "SchedulerConfigFactory":
157157
config_class = self.backend_to_class[self.backend]
158-
self.config = config_class(**self.config)
158+
raw = self.config
159+
if isinstance(raw, dict) and "config" in raw and "use_redis_queue" not in raw:
160+
raw = raw["config"]
161+
self.config = config_class(**raw)
159162
return self
160163

161164

src/memos/graph_dbs/polardb.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4667,6 +4667,36 @@ def build_filter_condition(condition_dict: dict) -> str:
46674667
condition_parts.append(
46684668
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype)::text LIKE '%{op_value}%'"
46694669
)
4670+
elif op == "nolike":
4671+
if key.startswith("info."):
4672+
info_field = key[5:]
4673+
if isinstance(op_value, str):
4674+
escaped_value = (
4675+
escape_sql_string(op_value)
4676+
.replace("%", "\\%")
4677+
.replace("_", "\\_")
4678+
)
4679+
condition_parts.append(
4680+
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype])::text NOT LIKE '%{escaped_value}%'"
4681+
)
4682+
else:
4683+
condition_parts.append(
4684+
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype])::text NOT LIKE '%{op_value}%'"
4685+
)
4686+
else:
4687+
if isinstance(op_value, str):
4688+
escaped_value = (
4689+
escape_sql_string(op_value)
4690+
.replace("%", "\\%")
4691+
.replace("_", "\\_")
4692+
)
4693+
condition_parts.append(
4694+
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype)::text NOT LIKE '%{escaped_value}%'"
4695+
)
4696+
else:
4697+
condition_parts.append(
4698+
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype)::text NOT LIKE '%{op_value}%'"
4699+
)
46704700
# Check if key starts with "info." prefix (for simple equality)
46714701
elif key.startswith("info."):
46724702
# Extract the field name after "info."
@@ -4756,6 +4786,7 @@ def parse_filter(
47564786
"project_id",
47574787
"manager_user_id",
47584788
"delete_time",
4789+
"related_id",
47594790
}
47604791

47614792
def process_condition(condition):

0 commit comments

Comments
 (0)