diff --git a/apps/desktop/src/components/model-visibility-dialog.tsx b/apps/desktop/src/components/model-visibility-dialog.tsx index 0b92dba36..05a5e92cb 100644 --- a/apps/desktop/src/components/model-visibility-dialog.tsx +++ b/apps/desktop/src/components/model-visibility-dialog.tsx @@ -14,10 +14,9 @@ import { $visibleModels, collapseModelFamilies, effectiveVisibleKeys, - emptyProviderSentinelKey, - isProviderSentinel, modelVisibilityKey, - setVisibleModels + setVisibleModels, + toggleModelVisibility } from '@/store/model-visibility' import type { ModelOptionProvider, ModelOptionsResponse } from '@/types/hermes' @@ -61,25 +60,7 @@ export function ModelVisibilityDialog({ const visible = effectiveVisibleKeys(stored, providers) const toggle = (provider: ModelOptionProvider, model: string) => { - const next = new Set(effectiveVisibleKeys($visibleModels.get(), providers)) - const key = modelVisibilityKey(provider.slug, model) - const sentinel = emptyProviderSentinelKey(provider.slug) - - if (next.has(key)) { - next.delete(key) - - // Check if this was the last real model for this provider. - const remainingForProvider = [...next].some(k => k.startsWith(`${provider.slug}::`) && !isProviderSentinel(k)) - - if (!remainingForProvider) { - next.add(sentinel) - } - } else { - next.delete(sentinel) - next.add(key) - } - - setVisibleModels(next) + setVisibleModels(toggleModelVisibility($visibleModels.get(), providers, provider.slug, model)) } const q = search.trim().toLowerCase() diff --git a/apps/desktop/src/store/model-visibility.test.ts b/apps/desktop/src/store/model-visibility.test.ts index 90eccdf45..805493cd5 100644 --- a/apps/desktop/src/store/model-visibility.test.ts +++ b/apps/desktop/src/store/model-visibility.test.ts @@ -4,10 +4,13 @@ import type { ModelOptionProvider } from '@/types/hermes' import { collapseModelFamilies, + defaultVisibleKeys, effectiveVisibleKeys, emptyProviderSentinelKey, isProviderSentinel, - modelVisibilityKey + modelVisibilityKey, + resolveVisibleKeys, + toggleModelVisibility } from './model-visibility' const provider = (slug: string, models: string[]): ModelOptionProvider => ({ @@ -96,4 +99,133 @@ describe('model visibility', () => { expect(isProviderSentinel('openai::')).toBe(true) expect(isProviderSentinel('openai::gpt-4o')).toBe(false) }) + + it('resolveVisibleKeys preserves sentinels that effectiveVisibleKeys strips', () => { + const stored = new Set([emptyProviderSentinelKey('nous')]) + const providers = [provider('nous', ['hermes-x', 'hermes-y']), provider('ollama', ['qwen3:latest'])] + + const resolved = resolveVisibleKeys(stored, providers) + expect(resolved.has(emptyProviderSentinelKey('nous'))).toBe(true) + expect(resolved.has(modelVisibilityKey('nous', 'hermes-x'))).toBe(false) + // Un-customized providers still expand to their defaults. + expect(resolved.has(modelVisibilityKey('ollama', 'qwen3:latest'))).toBe(true) + + // Display variant drops the sentinel. + expect(effectiveVisibleKeys(stored, providers).has(emptyProviderSentinelKey('nous'))).toBe(false) + }) +}) + +describe('toggleModelVisibility', () => { + const providers = [provider('openai', ['gpt-a', 'gpt-b']), provider('nous', ['hermes-x', 'hermes-y'])] + + // Drive the handler the way the dialog does: feed each result back in as the + // next `stored`, so the persisted set is what the next toggle starts from. + const apply = (stored: Set | null, slug: string, model: string) => + toggleModelVisibility(stored, providers, slug, model) + + it('records a hide-all sentinel when the last model of a provider is toggled off', () => { + let stored: Set | null = null + stored = apply(stored, 'openai', 'gpt-a') + stored = apply(stored, 'openai', 'gpt-b') + + expect(stored.has(emptyProviderSentinelKey('openai'))).toBe(true) + expect(effectiveVisibleKeys(stored, providers).has(modelVisibilityKey('openai', 'gpt-a'))).toBe(false) + expect(effectiveVisibleKeys(stored, providers).has(modelVisibilityKey('openai', 'gpt-b'))).toBe(false) + }) + + it('keeps a hidden provider hidden when a different provider is toggled (regression for #43485)', () => { + // Hide ALL of nous — its sentinel is now stored. + let stored: Set | null = null + stored = apply(stored, 'nous', 'hermes-x') + stored = apply(stored, 'nous', 'hermes-y') + expect(stored.has(emptyProviderSentinelKey('nous'))).toBe(true) + + // Toggle a model in another provider. nous must NOT snap back on. + stored = apply(stored, 'openai', 'gpt-a') + + expect(stored.has(emptyProviderSentinelKey('nous'))).toBe(true) + const visible = effectiveVisibleKeys(stored, providers) + expect(visible.has(modelVisibilityKey('nous', 'hermes-x'))).toBe(false) + expect(visible.has(modelVisibilityKey('nous', 'hermes-y'))).toBe(false) + }) + + it('clears only the toggled provider sentinel when a model is re-enabled', () => { + let stored: Set | null = new Set([emptyProviderSentinelKey('openai'), emptyProviderSentinelKey('nous')]) + + stored = apply(stored, 'openai', 'gpt-a') + + expect(stored.has(emptyProviderSentinelKey('openai'))).toBe(false) + expect(stored.has(emptyProviderSentinelKey('nous'))).toBe(true) + const visible = effectiveVisibleKeys(stored, providers) + expect(visible.has(modelVisibilityKey('openai', 'gpt-a'))).toBe(true) + expect(visible.has(modelVisibilityKey('nous', 'hermes-x'))).toBe(false) + }) + + it('re-enabling one model of a hidden-all provider restores ONLY that model, not the curated defaults', () => { + // openai hidden-all, nous untouched. + let stored: Set | null = new Set([emptyProviderSentinelKey('openai')]) + + stored = apply(stored, 'openai', 'gpt-a') + + const visible = effectiveVisibleKeys(stored, providers) + expect(visible.has(modelVisibilityKey('openai', 'gpt-a'))).toBe(true) + // gpt-b is NOT restored — "you hid everything, you get back only what you re-enable". + expect(visible.has(modelVisibilityKey('openai', 'gpt-b'))).toBe(false) + }) + + it('re-hiding the last re-enabled model re-adds the sentinel (full round-trip)', () => { + let stored: Set | null = new Set([emptyProviderSentinelKey('openai')]) + + // Re-enable gpt-a (clears sentinel, set = {gpt-a}), then toggle it back off. + stored = apply(stored, 'openai', 'gpt-a') + expect(stored.has(emptyProviderSentinelKey('openai'))).toBe(false) + stored = apply(stored, 'openai', 'gpt-a') + + expect(stored.has(emptyProviderSentinelKey('openai'))).toBe(true) + expect(effectiveVisibleKeys(stored, providers).has(modelVisibilityKey('openai', 'gpt-a'))).toBe(false) + }) + + it('toggling from an empty (non-null) stored set adds the model without expanding defaults', () => { + // Empty-but-not-null = "everything hidden". resolveVisibleKeys short-circuits to {}. + const stored = new Set() + + const next = apply(stored, 'openai', 'gpt-a') + + expect(next.has(modelVisibilityKey('openai', 'gpt-a'))).toBe(true) + // No curated defaults were expanded for any provider. + expect(next.has(modelVisibilityKey('openai', 'gpt-b'))).toBe(false) + expect(next.has(modelVisibilityKey('nous', 'hermes-x'))).toBe(false) + }) + + it('toggling off one default model from null stored keeps the rest of the curated defaults', () => { + // null = "never customized": resolveVisibleKeys expands all defaults first. + const next = apply(null, 'openai', 'gpt-a') + + expect(next.has(modelVisibilityKey('openai', 'gpt-a'))).toBe(false) + expect(next.has(modelVisibilityKey('openai', 'gpt-b'))).toBe(true) + expect(next.has(modelVisibilityKey('nous', 'hermes-x'))).toBe(true) + // Other models remain, so no sentinel. + expect(next.has(emptyProviderSentinelKey('openai'))).toBe(false) + }) + + it('tolerates a provider with zero models (defensive — dialog filters these out)', () => { + const ps = [provider('empty', []), provider('openai', ['gpt-a'])] + const next = toggleModelVisibility(new Set([modelVisibilityKey('openai', 'gpt-a')]), ps, 'empty', 'ghost') + + // No crash; the phantom key is recorded but no defaults are invented. + expect([...next].some(k => k.startsWith('empty::') && !isProviderSentinel(k))).toBe(true) + expect(next.has(modelVisibilityKey('openai', 'gpt-a'))).toBe(true) + }) +}) + +describe('resolveVisibleKeys', () => { + const providers = [provider('openai', ['gpt-a', 'gpt-b']), provider('nous', ['hermes-x', 'hermes-y'])] + + it('returns the curated defaults verbatim for null stored', () => { + expect(resolveVisibleKeys(null, providers)).toEqual(defaultVisibleKeys(providers)) + }) + + it('returns an empty set for an empty (non-null) stored set', () => { + expect([...resolveVisibleKeys(new Set(), providers)]).toEqual([]) + }) }) diff --git a/apps/desktop/src/store/model-visibility.ts b/apps/desktop/src/store/model-visibility.ts index 5c2b568c5..44f15b4c3 100644 --- a/apps/desktop/src/store/model-visibility.ts +++ b/apps/desktop/src/store/model-visibility.ts @@ -106,19 +106,29 @@ export function defaultVisibleKeys(providers: readonly ModelOptionProvider[]): S const keys = new Set() for (const provider of providers) { - const families = collapseModelFamilies(provider.models ?? []) - - for (const family of families.slice(0, DEFAULT_VISIBLE_PER_PROVIDER)) { - keys.add(modelVisibilityKey(provider.slug, family.id)) - } + expandProviderDefaults(provider, keys) } return keys } -/** Resolve which keys are currently visible: the user's explicit set when - * configured, otherwise the curated default for the given providers. */ -export function effectiveVisibleKeys( +/** Add a provider's curated default model keys (top-N collapsed families) to + * `target`. Shared by `defaultVisibleKeys` and `resolveVisibleKeys` so the + * expansion rule lives in exactly one place. */ +function expandProviderDefaults(provider: ModelOptionProvider, target: Set): void { + const families = collapseModelFamilies(provider.models ?? []) + + for (const family of families.slice(0, DEFAULT_VISIBLE_PER_PROVIDER)) { + target.add(modelVisibilityKey(provider.slug, family.id)) + } +} + +/** Resolve the canonical working set: the user's stored keys plus the curated + * default expansion for any provider they haven't customized. Hide-all + * sentinels are PRESERVED here — this is the set the toggle handler mutates and + * persists, so dropping a sentinel would silently re-enable a provider the user + * emptied. Use `effectiveVisibleKeys` for display (sentinels stripped). */ +export function resolveVisibleKeys( stored: Set | null, providers: readonly ModelOptionProvider[] ): Set { @@ -134,22 +144,31 @@ export function effectiveVisibleKeys( for (const provider of providers) { const providerPrefix = `${provider.slug}::` + const hasStoredProvider = [...stored].some( key => key.startsWith(providerPrefix) && !isProviderSentinel(key) ) + const hasSentinel = stored.has(emptyProviderSentinelKey(provider.slug)) if (hasStoredProvider || hasSentinel) { continue } - const families = collapseModelFamilies(provider.models ?? []) - - for (const family of families.slice(0, DEFAULT_VISIBLE_PER_PROVIDER)) { - next.add(modelVisibilityKey(provider.slug, family.id)) - } + expandProviderDefaults(provider, next) } + return next +} + +/** Resolve which keys are currently visible for DISPLAY: the resolved working + * set with bookkeeping sentinels stripped (they are not real models). */ +export function effectiveVisibleKeys( + stored: Set | null, + providers: readonly ModelOptionProvider[] +): Set { + const next = resolveVisibleKeys(stored, providers) + // Strip sentinel keys — they are bookkeeping, not real visibility entries. for (const key of [...next]) { if (isProviderSentinel(key)) { @@ -159,3 +178,42 @@ export function effectiveVisibleKeys( return next } + +/** Compute the next persisted visibility set when one model row is toggled. + * Seeds from `resolveVisibleKeys` (NOT `effectiveVisibleKeys`) so other + * providers' hide-all sentinels survive the persist. When the last visible + * model of a provider is toggled off, a sentinel records the explicit + * hide-all; re-enabling a model clears THAT provider's sentinel (only). */ +export function toggleModelVisibility( + stored: Set | null, + providers: readonly ModelOptionProvider[], + providerSlug: string, + model: string +): Set { + // `resolveVisibleKeys` always returns a fresh Set, so we can mutate it directly. + const next = resolveVisibleKeys(stored, providers) + const key = modelVisibilityKey(providerSlug, model) + const sentinel = emptyProviderSentinelKey(providerSlug) + + if (next.has(key)) { + next.delete(key) + + // Check if this was the last real model for this provider. + const remainingForProvider = [...next].some( + k => k.startsWith(`${providerSlug}::`) && !isProviderSentinel(k) + ) + + if (!remainingForProvider) { + next.add(sentinel) + } + } else { + // Re-enabling promotes a previously hidden-all provider to an explicit + // set of exactly the one re-enabled model — the curated defaults are NOT + // restored. Intentional: "you hid everything, you get back only what you + // re-enable." (Locked in by the sentinel-clear-on-re-enable test.) + next.delete(sentinel) + next.add(key) + } + + return next +} diff --git a/cron/jobs.py b/cron/jobs.py index ef4ce4941..a120ad5d1 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -409,6 +409,31 @@ def _ensure_aware(dt: datetime) -> datetime: return dt.astimezone(target_tz) +def _timezone_offset_mismatch(stored: datetime, current: datetime) -> bool: + """Return True when a stored aware timestamp uses a different UTC offset. + + Naive stored timestamps return False: they carry no offset to compare, and + are normalized by ``_ensure_aware`` instead — they intentionally never take + the offset-repair path. + """ + if stored.tzinfo is None or current.tzinfo is None: + return False + return stored.utcoffset() != current.utcoffset() + + +def _stored_wall_clock_is_future(stored: datetime, current: datetime) -> bool: + """Return True when the stored local wall-clock time has not arrived yet. + + Cron schedules express local wall-clock intent. If Hermes/system local time + changes after next_run_at was persisted, an old offset can make a future + wall-clock run look due at the converted absolute time (for example + 21:00+10 becomes 13:00+02). Comparing naive wall-clock values lets us + distinguish that migration case from a genuinely missed run whose scheduled + wall time has already passed. + """ + return stored.replace(tzinfo=None) > current.replace(tzinfo=None) + + def _recoverable_oneshot_run_at( schedule: Dict[str, Any], now: datetime, @@ -1352,10 +1377,50 @@ def _get_due_jobs_locked() -> List[Dict[str, Any]]: needs_save = True break - next_run_dt = _ensure_aware(datetime.fromisoformat(next_run)) + raw_next_run_dt = datetime.fromisoformat(next_run) + schedule = job.get("schedule", {}) + kind = schedule.get("kind") + + next_run_dt = _ensure_aware(raw_next_run_dt) + # Migration repair: a cron job persists next_run_at as an absolute + # instant, but the cron expr describes local wall-clock intent. If the + # configured/system timezone changed after persistence, the stored + # instant's offset no longer matches now's, and its converted time can + # look due hours early (21:00+10 -> 13:00+02). When the stored *wall + # clock* is still in the future, recompute from the schedule so we fire + # at the intended local time instead of early-then-again. + # + # TRADE-OFF: this cannot distinguish a config/host TZ migration from a + # legitimate DST offset change. A DST boundary that satisfies all four + # conditions will recompute (and thus SKIP the pending occurrence, no + # catch-up) rather than fire it. Accepted: in the pure-migration case + # the recompute lands on the same wall-clock time later the same period, + # and DST-boundary collisions with a still-future stored wall clock are + # rare relative to the double-fire bug this prevents (#28934). + if ( + kind == "cron" + and next_run_dt <= now + and _timezone_offset_mismatch(raw_next_run_dt, now) + and _stored_wall_clock_is_future(raw_next_run_dt, now) + ): + new_next = compute_next_run(schedule, now.isoformat()) + if new_next: + logger.info( + "Job '%s' next_run_at offset changed (%s -> %s). " + "Recomputing cron run to preserve local wall-clock intent: %s", + job.get("name", job["id"]), + raw_next_run_dt.utcoffset(), + now.utcoffset(), + new_next, + ) + for rj in raw_jobs: + if rj["id"] == job["id"]: + rj["next_run_at"] = new_next + needs_save = True + break + continue + if next_run_dt <= now: - schedule = job.get("schedule", {}) - kind = schedule.get("kind") # For recurring jobs, check if the scheduled time is stale # (gateway was down and missed the window). Fast-forward to diff --git a/cron/scheduler.py b/cron/scheduler.py index e023d8fcd..af48de7c1 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -717,6 +717,27 @@ def _send_media_via_adapter( logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e) +def _confirm_adapter_delivery(send_result) -> bool: + """Return True only if ``send_result`` unambiguously confirms delivery. + + A live adapter that returns ``None`` (e.g. a swallowed exception, a busy + platform, or a code path that returns early without producing a + ``SendResult``) must NOT be treated as success — doing so causes the + scheduler to log ``"delivered to via live adapter"`` while the + gateway never actually sees the message (#47056). + + Likewise, an object missing a ``success`` attribute (e.g. a bare ``dict`` + or a partial mock) is a contract violation: it does not actually tell us + whether the send succeeded. Require an explicit, truthy ``success`` + attribute to count as confirmed. + """ + if send_result is None: + return False + if not hasattr(send_result, "success"): + return False + return bool(getattr(send_result, "success")) + + def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]: """ Deliver job output to the configured target(s) (origin chat, specific platform, etc.). @@ -730,11 +751,25 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option """ targets = _resolve_delivery_targets(job) if not targets: - if job.get("deliver", "local") != "local": - msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}" - logger.warning("Job '%s': %s", job["id"], msg) - return msg - return None # local-only jobs don't deliver — not a failure + deliver_value = _normalize_deliver_value(job.get("deliver", "local")) + if deliver_value == "local": + return None # local-only jobs don't deliver — not a failure + # deliver=origin with no resolvable origin and no configured home + # channels: treat as local rather than reporting an error. CLI-created + # jobs never capture a {platform, chat_id} origin, so failing here would + # make every CLI `deliver=origin` (or auto-detect) job emit a spurious + # "no delivery target resolved" error on every run (#43014). The output + # is still persisted in last_output for `cron list`/resume. + if deliver_value == "origin": + logger.info( + "Job '%s': deliver=origin but no origin or home channels — " + "skipping delivery (output saved in last_output)", + job.get("name", job.get("id", "?")), + ) + return None + msg = f"no delivery target resolved for deliver={deliver_value}" + logger.warning("Job '%s': %s", job["id"], msg) + return msg from tools.send_message_tool import _send_to_platform from gateway.config import load_gateway_config, Platform @@ -819,65 +854,213 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option delivered = False target_errors = [] if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): - send_metadata = {"thread_id": thread_id} if thread_id else None + # Telegram three-mode topic routing (#22773): a private chat + # (positive chat_id) with a NUMERIC topic id is a Bot API Direct + # Messages topic and must be addressed via ``direct_messages_topic_id`` + # — a bare ``message_thread_id`` is rejected/mis-routed by Bot API + # 10.0 and lands in General. Forum/supergroup targets (negative + # chat_id) and named DM-topic lanes keep the default thread_id + # handling. Compute the routed metadata ONCE so both the text send + # (via DeliveryRouter) and the media send use the same routing. + from gateway.delivery import ( + DeliveryRouter, + DeliveryTarget, + _looks_like_int, + _looks_like_telegram_private_chat_id, + ) + + is_private_dm_topic = ( + platform == Platform.TELEGRAM + and thread_id is not None + and _looks_like_telegram_private_chat_id(str(chat_id)) + and _looks_like_int(str(thread_id)) + ) + if is_private_dm_topic: + # Routed via direct_messages_topic_id (mode 2), no bare thread_id. + route_thread_id = None + route_metadata = { + "direct_messages_topic_id": str(thread_id), + "job_id": job["id"], + } + # Media metadata mirrors the text routing so attachments land in + # the same DM topic instead of the General lane (#22773). + media_metadata = {"direct_messages_topic_id": str(thread_id)} + else: + route_thread_id = str(thread_id) if thread_id is not None else None + route_metadata = {"job_id": job["id"]} + media_metadata = {"thread_id": thread_id} if thread_id else None + try: - # Send cleaned text (MEDIA tags stripped) — not the raw content + # Send cleaned text (MEDIA tags stripped) — not the raw content. + # Route through the gateway's DeliveryRouter so the live send + # gets the same platform-specific routing as live messages — + # in particular Telegram's three-mode topic routing. The + # standalone cron path lacked this, so DM-topic cron deliveries + # landed in the General topic or were rejected by Bot API 10.0 + # (#22773). text_to_send = cleaned_delivery_content.strip() adapter_ok = True + timed_out = False if text_to_send: from agent.async_utils import safe_schedule_threadsafe + + router = DeliveryRouter(config, adapters) + route_target = DeliveryTarget( + platform=platform, + chat_id=str(chat_id), + thread_id=route_thread_id, + is_explicit=True, + ) + # Pass thread routing via the target (not a bare metadata + # "thread_id"): the router only applies its Telegram DM-topic + # detection when "thread_id"/"message_thread_id" are absent + # from metadata, deriving the routing from target.thread_id + # or the explicit direct_messages_topic_id above. future = safe_schedule_threadsafe( - runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), + router._deliver_to_platform( + route_target, + text_to_send, + route_metadata, + ), loop, ) if future is None: adapter_ok = False target_errors.append("live adapter event loop scheduling failed") else: + send_result = None + timeout_handled = False try: send_result = future.result(timeout=60) - except TimeoutError as te: - future.cancel() - target_errors.append(f"live adapter send timed out: {te}") - raise + except TimeoutError: + # #38922: a slow confirmation does NOT necessarily + # mean the send failed — but we must distinguish two + # cases via future.cancel()'s return value: + # + # cancel() == False -> the coroutine was already + # running on the gateway loop when the timeout + # fired; the request is in flight on the wire and + # cannot be un-sent. Re-sending via standalone + # would be a guaranteed DUPLICATE, so treat it as + # delivered (assume-delivered). + # + # cancel() == True -> the scheduled callback never + # started executing (loop wedged/backlogged for + # the full 60s), so nothing was sent. We MUST + # fall through to the standalone path or the + # message is silently dropped (worse than a + # duplicate). + cancelled = future.cancel() + if cancelled: + msg = ( + f"live adapter send to {platform_name}:{chat_id} " + "timed out before the coroutine was dispatched" + ) + logger.warning( + "Job '%s': %s, falling back to standalone", + job["id"], msg, + ) + target_errors.append(msg) + adapter_ok = False # fall through to standalone path + timeout_handled = True + else: + timed_out = True + timeout_handled = True + logger.warning( + "Job '%s': live adapter send to %s:%s timed out " + "after 60s; already dispatched (in flight), " + "assuming delivered (skipping standalone fallback " + "to avoid duplicate)", + job["id"], platform_name, chat_id, + ) except Exception as ex: + # A real send error (not a slow confirmation) — fall + # through to the standalone path so the message is + # still delivered. target_errors.append(f"live adapter send failed: {ex}") raise - if send_result is None or not getattr(send_result, "success", True): - err = getattr(send_result, "error", "unknown") if send_result else "no response from adapter" - msg = f"live adapter send to {platform_name}:{chat_id} failed: {err}" - logger.warning( - "Job '%s': %s, falling back to standalone", - job["id"], msg, - ) - target_errors.append(msg) - adapter_ok = False # fall through to standalone path - elif ( - send_result - and thread_id - and getattr(send_result, "raw_response", None) - and send_result.raw_response.get("thread_fallback") - ): - requested_thread_id = send_result.raw_response.get("requested_thread_id") or thread_id - msg = ( - f"configured thread_id {requested_thread_id} for " - f"{platform_name}:{chat_id} was not found; delivered without thread_id" - ) - logger.warning("Job '%s': %s", job["id"], msg) - delivery_errors.append(msg) - - # Send extracted media files as native attachments via the live adapter - if adapter_ok and media_files: + if timeout_handled: + # The timeout branch above already decided the + # outcome (assume-delivered if in flight, or + # adapter_ok=False to fall through if never + # dispatched). send_result is None, so skip the + # confirmation/thread-fallback inspection below. + pass + else: + # _deliver_to_platform returns either a SendResult + # (.success attr) or, when the silence-narration + # filter drops the message, a plain dict + # {"success": True, "delivered": False, ...}. + # Normalize both shapes so a getattr default doesn't + # misread a dict, and so a None / success-less object + # is NOT counted as delivered (#47056). + if isinstance(send_result, dict): + send_success = bool(send_result.get("success", False)) + send_raw_response = send_result.get("raw_response") + else: + send_success = _confirm_adapter_delivery(send_result) + send_raw_response = getattr(send_result, "raw_response", None) + + if not send_success: + if isinstance(send_result, dict): + err = send_result.get("error", "unknown") + shape = "dict" + elif send_result is not None: + err = getattr(send_result, "error", None) + shape = type(send_result).__name__ + else: + err = "no response from adapter" + shape = "None" + msg = ( + f"live adapter send to {platform_name}:{chat_id} " + f"returned unconfirmed result ({shape}, error={err})" + ) + logger.warning( + "Job '%s': %s, falling back to standalone", + job["id"], msg, + ) + target_errors.append(msg) + adapter_ok = False # fall through to standalone path + elif ( + send_raw_response + and thread_id + and send_raw_response.get("thread_fallback") + ): + requested_thread_id = send_raw_response.get("requested_thread_id") or thread_id + msg = ( + f"configured thread_id {requested_thread_id} for " + f"{platform_name}:{chat_id} was not found; delivered without thread_id" + ) + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) + + # Send extracted media files as native attachments via the live + # adapter, using the same DM-topic-aware routing as the text send + # (#22773 — media previously used a bare thread_id and landed in + # the General lane for private DM topics). Skip on an in-flight + # confirmation timeout: the gateway loop is contended, so each + # media send would also block its 30s budget, and the text + # payload is already assumed delivered (#38922). Record the + # skipped attachments so the drop is visible rather than silently + # lost. + if adapter_ok and not timed_out and media_files: _send_media_via_adapter( runtime_adapter, chat_id, media_files, - send_metadata, + media_metadata, loop, job, platform=platform, ) + elif timed_out and media_files: + msg = ( + f"{len(media_files)} media attachment(s) not delivered to " + f"{platform_name}:{chat_id} (live adapter confirmation timed out)" + ) + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) if adapter_ok: logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index f15181dee..3c3116970 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -353,7 +353,14 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int: if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"): print(f" Next run: {result['job']['next_run_at']}") if action == "run": - print(" It will run on the next scheduler tick.") + job = result.get("job", {}) + if job.get("executed"): + outcome = "succeeded" if job.get("execution_success") else "failed" + print(f" Ran now: {outcome}.") + elif job.get("execution_skipped"): + print(f" {job['execution_skipped']}") + else: + print(" It will run on the next scheduler tick.") return 0 diff --git a/scripts/release.py b/scripts/release.py index cea433940..288ed7c2b 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -1472,6 +1472,7 @@ "beastant1@gmail.com": "nekwo", # PR #26481 (PS5.1 UTF-8 BOM) "43717185+nekwo@users.noreply.github.com": "nekwo", "9785479+stepanov1975@users.noreply.github.com": "stepanov1975", # PR #22074 (setup config picker writes) + "devsart95@gmail.com": "devsart95", # PR #23249 (cron Telegram DM topic delivery) "67979730+flooryyyy@users.noreply.github.com": "flooryyyy", # PR #26374 (tool_trace error detection) "188585318+dgians@users.noreply.github.com": "dgians", # PR #26034 (.ts/.py/.sh docs types) "zealy@tz.co": "dgians", # PR #26034 (bot-committed by zealy-tzco under dgians' PR) diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index d044f051f..f54041d05 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -849,6 +849,151 @@ def test_broken_interval_without_next_run_is_recovered(self, tmp_cron_dir, monke assert recovered_dt > now + def test_cron_next_run_offset_migration_is_rescheduled_not_fired(self, tmp_cron_dir, monkeypatch): + current_tz = timezone(timedelta(hours=2)) + now = datetime(2026, 5, 19, 13, 2, 0, tzinfo=current_tz) + monkeypatch.setattr("cron.jobs._hermes_now", lambda: now) + + # A 21:00 cron was stored while Hermes/system local time was UTC+10. + # After the host moves to UTC+02, that absolute timestamp converts to + # 13:00+02. At 13:02+02 the old code considered it due and fired, even + # though the user's local wall-clock cron intent is still 21:00. + save_jobs( + [{ + "id": "cron-tz-migrate", + "name": "Migrated local cron", + "prompt": "...", + "schedule": {"kind": "cron", "expr": "0 21 * * 2", "display": "0 21 * * 2"}, + "schedule_display": "0 21 * * 2", + "repeat": {"times": None, "completed": 0}, + "enabled": True, + "state": "scheduled", + "paused_at": None, + "paused_reason": None, + "created_at": "2026-05-12T21:00:00+10:00", + "next_run_at": "2026-05-19T21:00:00+10:00", + "last_run_at": "2026-05-12T21:00:00+10:00", + "last_status": "ok", + "last_error": None, + "deliver": "local", + "origin": None, + }] + ) + + assert get_due_jobs() == [] + repaired = datetime.fromisoformat(get_job("cron-tz-migrate")["next_run_at"]) + assert repaired == datetime(2026, 5, 19, 21, 0, 0, tzinfo=current_tz) + + def test_cron_offset_migration_does_not_repair_already_passed_wall_time(self, tmp_cron_dir, monkeypatch): + current_tz = timezone(timedelta(hours=2)) + now = datetime(2026, 5, 19, 13, 2, 0, tzinfo=current_tz) + monkeypatch.setattr("cron.jobs._hermes_now", lambda: now) + + save_jobs( + [{ + "id": "cron-tz-missed", + "name": "Migrated missed cron", + "prompt": "...", + "schedule": {"kind": "cron", "expr": "0 9 * * 2", "display": "0 9 * * 2"}, + "schedule_display": "0 9 * * 2", + "repeat": {"times": None, "completed": 0}, + "enabled": True, + "state": "scheduled", + "paused_at": None, + "paused_reason": None, + "created_at": "2026-05-12T09:00:00+10:00", + "next_run_at": "2026-05-19T09:00:00+10:00", + "last_run_at": "2026-05-12T09:00:00+10:00", + "last_status": "ok", + "last_error": None, + "deliver": "local", + "origin": None, + }] + ) + + # The wall-clock time has already passed, so this follows the existing + # stale-run fast-forward behavior instead of the timezone-migration + # repair path for future wall-clock runs. + assert get_due_jobs() == [] + repaired = datetime.fromisoformat(get_job("cron-tz-missed")["next_run_at"]) + assert repaired == datetime(2026, 5, 26, 9, 0, 0, tzinfo=current_tz) + + def test_same_tz_due_cron_still_fires(self, tmp_cron_dir, monkeypatch): + """Guard must NOT over-fire: a due cron in the SAME offset fires normally.""" + current_tz = timezone(timedelta(hours=2)) + now = datetime(2026, 5, 19, 21, 0, 30, tzinfo=current_tz) + monkeypatch.setattr("cron.jobs._hermes_now", lambda: now) + save_jobs([{ + "id": "cron-same-tz", "name": "same tz", "prompt": "...", + "schedule": {"kind": "cron", "expr": "0 21 * * 2", "display": "0 21 * * 2"}, + "schedule_display": "0 21 * * 2", + "repeat": {"times": None, "completed": 0}, + "enabled": True, "state": "scheduled", "paused_at": None, "paused_reason": None, + "created_at": "2026-05-12T21:00:00+02:00", + "next_run_at": "2026-05-19T21:00:00+02:00", # same offset as now + "last_run_at": "2026-05-12T21:00:00+02:00", + "last_status": "ok", "last_error": None, "deliver": "local", "origin": None, + }]) + # offset matches -> guard skips -> the genuinely-due job is returned to fire. + due = get_due_jobs() + assert [j["id"] for j in due] == ["cron-same-tz"] + + def test_interval_job_with_stale_offset_is_unaffected(self, tmp_cron_dir, monkeypatch): + """The offset-repair guard is cron-only; interval jobs never take it. + + A stale-offset interval job whose converted instant is well past the + grace window is handled by the pre-existing stale fast-forward path + (not the cron repair path). Verify it fast-forwards via interval math + (next = now + interval), proving the cron-only guard didn't touch it. + """ + current_tz = timezone(timedelta(hours=2)) + now = datetime(2026, 5, 19, 13, 2, 0, tzinfo=current_tz) + monkeypatch.setattr("cron.jobs._hermes_now", lambda: now) + save_jobs([{ + "id": "interval-stale-tz", "name": "interval", "prompt": "...", + "schedule": {"kind": "interval", "minutes": 60, "display": "every 1h"}, + "schedule_display": "every 1h", + "repeat": {"times": None, "completed": 0}, + "enabled": True, "state": "scheduled", "paused_at": None, "paused_reason": None, + "created_at": "2026-05-19T10:00:00+10:00", + "next_run_at": "2026-05-19T12:00:00+10:00", # stale offset, instant 04:00+02 (well past) + "last_run_at": "2026-05-19T11:00:00+10:00", + "last_status": "ok", "last_error": None, "deliver": "local", "origin": None, + }]) + get_due_jobs() + # The cron-only repair path would have produced a cron occurrence; instead + # the interval stale fast-forward recomputes next = now + 60m (interval + # math), confirming the guard did not intercept this interval job. + nr = datetime.fromisoformat(get_job("interval-stale-tz")["next_run_at"]) + assert nr == now + timedelta(minutes=60) + + def test_offset_migration_at_wall_clock_equal_now_falls_through(self, tmp_cron_dir, monkeypatch): + """Boundary: stored wall-clock == now wall-clock (strict >) does NOT take + the repair path — it falls through to the existing due/fast-forward logic.""" + current_tz = timezone(timedelta(hours=2)) + now = datetime(2026, 5, 19, 13, 0, 0, tzinfo=current_tz) + monkeypatch.setattr("cron.jobs._hermes_now", lambda: now) + save_jobs([{ + "id": "cron-wall-equal", "name": "wall equal", "prompt": "...", + "schedule": {"kind": "cron", "expr": "0 13 * * 2", "display": "0 13 * * 2"}, + "schedule_display": "0 13 * * 2", + "repeat": {"times": None, "completed": 0}, + "enabled": True, "state": "scheduled", "paused_at": None, "paused_reason": None, + "created_at": "2026-05-12T13:00:00+10:00", + # stored naive wall-clock 13:00 == now naive wall-clock 13:00 -> strict > is False + "next_run_at": "2026-05-19T13:00:00+10:00", + "last_run_at": "2026-05-12T13:00:00+10:00", + "last_status": "ok", "last_error": None, "deliver": "local", "origin": None, + }]) + # _stored_wall_clock_is_future is strict (>), so 13:00 == 13:00 is False + # -> repair guard skipped -> existing logic handles it (does not raise). + get_due_jobs() # must not raise / must not take the repair branch + # next_run_at must NOT have been rewritten to a future cron occurrence by + # the repair path (it either fires or fast-forwards via the normal path). + nr = get_job("cron-wall-equal")["next_run_at"] + assert nr is None or datetime.fromisoformat(nr).utcoffset() == now.utcoffset() or "+10:00" in nr + + class TestEnabledToolsets: def test_enabled_toolsets_stored(self, tmp_cron_dir): job = create_job(prompt="monitor", schedule="every 1h", enabled_toolsets=["web", "terminal"]) diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 0cfbba1c7..d3c2dd3a2 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -627,9 +627,15 @@ def test_live_adapter_sends_media_as_attachments(self, tmp_path, monkeypatch): # run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg) def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -678,9 +684,15 @@ def test_live_adapter_routes_image_to_send_image_file(self, tmp_path, monkeypatc loop.is_running.return_value = True def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -721,9 +733,15 @@ def test_live_adapter_media_only_no_text(self, tmp_path, monkeypatch): loop.is_running.return_value = True def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -765,9 +783,15 @@ def test_live_adapter_sends_cleaned_text_not_raw(self): loop.is_running.return_value = True def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -2708,15 +2732,20 @@ def mock_run_job(job): class TestDeliverResultTimeoutCancelsFuture: - """When future.result(timeout=60) raises TimeoutError in the live - adapter delivery path, _deliver_result must cancel the orphan - coroutine so it cannot duplicate-send after the standalone fallback. + """When future.result(timeout=60) raises TimeoutError in the live adapter + delivery path, the outcome depends on whether the coroutine was already + running. future.cancel() returning False means it is in flight on the wire + (cannot be un-sent) → treat as DELIVERED and skip the standalone fallback to + avoid a duplicate (#38922). future.cancel() returning True means it never + started (wedged loop) → nothing was sent, so fall through to standalone or + the message is silently dropped. Regression for #38922. """ - def test_live_adapter_timeout_cancels_future_and_falls_back(self): - """End-to-end: live adapter hangs past the 60s budget, _deliver_result - patches the timeout down to a fast value, confirms future.cancel() fires, - and verifies the standalone fallback path still delivers.""" + def test_live_adapter_timeout_assumes_delivered_no_duplicate(self): + """End-to-end: live adapter confirmation times out past the 60s budget. + The fix (#38922) treats the send as already-dispatched/delivered and + does NOT run the standalone fallback — otherwise the message is sent + twice.""" from gateway.config import Platform from concurrent.futures import Future @@ -2732,18 +2761,19 @@ def test_live_adapter_timeout_cancels_future_and_falls_back(self): loop = MagicMock() loop.is_running.return_value = True - # A real concurrent.futures.Future so .cancel() has real semantics, - # but we override .result() to raise TimeoutError exactly like the - # 60s wait firing in production. + # A real concurrent.futures.Future, but we override .result() to raise + # TimeoutError exactly like the 60s wait firing in production. We make + # .cancel() return False to simulate the coroutine being ALREADY RUNNING + # on the gateway loop (in flight on the wire) — the case where the send + # cannot be un-sent and a standalone resend would be a duplicate. captured_future = Future() cancel_calls = [] - original_cancel = captured_future.cancel - def tracking_cancel(): + def in_flight_cancel(): cancel_calls.append(True) - return original_cancel() + return False # already running — cannot be cancelled - captured_future.cancel = tracking_cancel + captured_future.cancel = in_flight_cancel captured_future.result = MagicMock(side_effect=TimeoutError("timed out")) def fake_run_coro(coro, _loop): @@ -2769,25 +2799,261 @@ def fake_run_coro(coro, _loop): loop=loop, ) - # 1. The orphan future was cancelled on timeout (the bug fix) - assert cancel_calls == [True], "future.cancel() must fire on TimeoutError" - # 2. The standalone fallback delivered — no double send, no silent drop + # 1. cancel() was attempted (returned False = in flight). + assert cancel_calls == [True], "future.cancel() should be attempted on TimeoutError" + # 2. Delivery is reported successful (no error string returned). assert result is None, f"expected successful delivery, got error: {result!r}" + # 3. The standalone fallback must NOT run — that is the #38922 fix: + # an in-flight confirmation timeout is assume-delivered, not a resend. + standalone_send.assert_not_awaited() + + def test_live_adapter_timeout_before_dispatch_falls_back_to_standalone(self): + """When the coroutine never started (loop wedged) — future.cancel() + returns True — nothing was sent, so _deliver_result MUST fall through + to the standalone path rather than silently dropping the message. + This is the inverse of the assume-delivered case and guards against the + wedged-loop silent drop.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + captured_future = Future() + cancel_calls = [] + + def never_dispatched_cancel(): + cancel_calls.append(True) + return True # callback never ran — successfully cancelled + + captured_future.cancel = never_dispatched_cancel + captured_future.result = MagicMock(side_effect=TimeoutError("timed out")) + + def fake_run_coro(coro, _loop): + coro.close() + return captured_future + + job = { + "id": "timeout-undispatched-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + + standalone_send = AsyncMock(return_value={"success": True}) + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + assert cancel_calls == [True], "future.cancel() should be attempted" + # The standalone path MUST run — the message was never sent. standalone_send.assert_awaited_once() + assert result is None, f"standalone should have delivered, got: {result!r}" + + def test_live_adapter_real_exception_falls_back_to_standalone(self): + """A non-timeout send Exception (real failure, not a slow confirmation) + must fall through to the standalone path so the message is still + delivered. Guards the `except Exception: raise` branch — the bug class + where broadening the timeout handler to swallow all exceptions would + silently drop messages.""" + from gateway.config import Platform + from concurrent.futures import Future - def test_live_adapter_thread_fallback_records_delivery_error(self): - """A cron target with an explicit topic must not be marked clean if - Telegram falls back to the base chat after "thread not found". + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + captured_future = Future() + captured_future.result = MagicMock(side_effect=RuntimeError("adapter exploded")) + + def fake_run_coro(coro, _loop): + coro.close() + return captured_future + + job = { + "id": "send-error-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + + standalone_send = AsyncMock(return_value={"success": True}) + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + # A real exception must NOT be assume-delivered: standalone runs. + standalone_send.assert_awaited_once() + assert result is None, f"standalone should have delivered, got: {result!r}" + + def test_live_adapter_private_dm_topic_routes_via_direct_messages_topic_id(self): + """#22773: a cron target to a PRIVATE Telegram chat with a numeric topic + id must be routed via ``direct_messages_topic_id`` (Bot API DM topics), + NOT a bare ``message_thread_id`` (which Bot API 10.0 rejects / mis-routes + to General). The cron live-adapter path routes through the gateway + DeliveryRouter, which applies the same three-mode routing as live + messages. """ from gateway.config import Platform from gateway.platforms.base import SendResult from concurrent.futures import Future + send_result = SendResult(success=True, message_id="42") + adapter = MagicMock() + adapter.send = AsyncMock(return_value=send_result) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + # DeliveryRouter consults the silence-narration config flag. + mock_cfg.filter_silence_narration = False + + loop = MagicMock() + loop.is_running.return_value = True + + job = { + "id": "dm-topic-job", + "deliver": "telegram:226252250:7072", # private chat + numeric topic + } + + def fake_run_coro(coro, _loop): + import asyncio as _asyncio + future = Future() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) + return future + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + assert result is None, f"expected clean delivery, got: {result!r}" + adapter.send.assert_called_once() + sent_chat_id, sent_text = adapter.send.call_args[0][0], adapter.send.call_args[0][1] + sent_metadata = adapter.send.call_args[1]["metadata"] + assert sent_chat_id == "226252250" + assert sent_text == "Hello world" + # The topic must be addressed via direct_messages_topic_id, and a bare + # message_thread_id must NOT be set (that is the Bot API 10.0 bug). + assert str(sent_metadata.get("direct_messages_topic_id")) == "7072" + assert not sent_metadata.get("message_thread_id") + + def test_live_adapter_private_dm_topic_media_routes_via_direct_messages_topic_id(self, tmp_path, monkeypatch): + """#22773 (media): MEDIA attachments to a private DM topic must also be + routed via ``direct_messages_topic_id``, not a bare ``message_thread_id`` + — the media path previously used the bare thread_id and landed + attachments in the General lane.""" + from gateway.config import Platform + from gateway.platforms.base import SendResult + from concurrent.futures import Future + + media_root = tmp_path / "media-cache" + media_file = media_root / "chart.png" + media_file.parent.mkdir(parents=True, exist_ok=True) + media_file.write_bytes(b"media") + monkeypatch.setattr( + "gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS", + (media_root,), + ) + media_path = media_file.resolve() + + adapter = AsyncMock() + adapter.send.return_value = SendResult(success=True, message_id="1") + adapter.send_image_file.return_value = SendResult(success=True, message_id="2") + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + mock_cfg.filter_silence_narration = False + + loop = MagicMock() + loop.is_running.return_value = True + + job = { + "id": "dm-topic-media-job", + "deliver": "telegram:226252250:7072", # private chat + numeric topic + } + + def fake_run_coro(coro, _loop): + import asyncio as _asyncio + future = Future() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) + return future + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + _deliver_result( + job, + f"Chart attached\nMEDIA:{media_path}", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + adapter.send_image_file.assert_called_once() + media_metadata = adapter.send_image_file.call_args[1]["metadata"] + assert str(media_metadata.get("direct_messages_topic_id")) == "7072" + assert not media_metadata.get("message_thread_id") + assert not media_metadata.get("thread_id") + + def test_live_adapter_forum_thread_fallback_records_delivery_error(self): + """A forum/supergroup cron target whose configured topic is gone must + NOT be reported as a clean delivery: when the Telegram adapter falls + back to the base chat (raw_response thread_fallback), the scheduler must + record the "delivered without thread_id" delivery error. Regression + coverage for the thread_fallback-recording branch (kept distinct from + the #22773 routing fix).""" + from gateway.config import Platform + from gateway.platforms.base import SendResult + from concurrent.futures import Future + send_result = SendResult( success=True, message_id="42", raw_response={ - "requested_thread_id": 7072, + "requested_thread_id": 17, "thread_fallback": True, }, ) @@ -2798,41 +3064,159 @@ def test_live_adapter_thread_fallback_records_delivery_error(self): pconfig.enabled = True mock_cfg = MagicMock() mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + mock_cfg.filter_silence_narration = False loop = MagicMock() loop.is_running.return_value = True + # Forum supergroup (negative chat_id) + numeric topic → mode 1 + # (message_thread_id); NOT a private DM topic. job = { - "id": "thread-fallback-job", - "deliver": "telegram:226252250:7072", + "id": "forum-fallback-job", + "deliver": "telegram:-1001234567890:17", } + def fake_run_coro(coro, _loop): + import asyncio as _asyncio + future = Future() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) + return future + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + assert result is not None + assert "was not found; delivered without thread_id" in result + # Forum target routes via message_thread_id (mode 1), not DM-topic. + sent_metadata = adapter.send.call_args[1]["metadata"] + assert not sent_metadata.get("direct_messages_topic_id") + + +class TestDeliverResultLiveAdapterUnconfirmed: + """Regression for #47056. + + When a live adapter's send() returns ``None`` (swallowed exception / busy + platform) or a result object that lacks an explicit ``success`` attribute + (bare dict / partial object), the scheduler must NOT log "delivered via + live adapter" and silently drop the message. Every unconfirmed shape must + fall through to the standalone delivery path so the message actually + arrives. The pre-fix check ``send_result is None or not getattr(..., + "success", True)`` let a ``.success``-less object default to True = silent + success. + """ + + def _run(self, send_value): + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = send_value + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + completed_future = Future() - completed_future.set_result(send_result) + completed_future.set_result(send_value) def fake_run_coro(coro, _loop): coro.close() return completed_future + job = { + "id": "unconfirmed-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + + standalone_send = AsyncMock(return_value={"success": True}) + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ - patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): result = _deliver_result( job, "Hello world", adapters={Platform.TELEGRAM: adapter}, loop=loop, ) + return result, standalone_send - assert result == ( - "configured thread_id 7072 for telegram:226252250 was not found; " - "delivered without thread_id" - ) - adapter.send.assert_called_once_with( - "226252250", - "Hello world", - metadata={"thread_id": "7072"}, - ) + def test_none_result_falls_through_to_standalone(self): + """send() returning None must trigger the standalone fallback, not a + silent "delivered" log.""" + result, standalone_send = self._run(None) + assert result is None, f"standalone should have delivered, got: {result!r}" + standalone_send.assert_awaited_once() + + def test_result_missing_success_attr_falls_through(self): + """A result object with no ``success`` attribute is a contract + violation and must NOT be counted as delivered (it defaulted to True + before the fix).""" + class _NoSuccess: + pass + + result, standalone_send = self._run(_NoSuccess()) + assert result is None, f"standalone should have delivered, got: {result!r}" + standalone_send.assert_awaited_once() + + def test_confirmed_success_does_not_fall_through(self): + """A genuine SendResult(success=True) is confirmed — the standalone + path must NOT run (no duplicate).""" + result, standalone_send = self._run(MagicMock(success=True, raw_response=None)) + assert result is None + standalone_send.assert_not_awaited() + + +class TestDeliverOriginUnresolvableIsLocal: + """Regression for #43014. + + A cron job created in a CLI session has no {platform, chat_id} origin. + With ``deliver=origin`` (or auto-detect / deliver=None) and no configured + platform home channel, delivery is unresolvable — but that is the EXPECTED + state for CLI jobs, not an error. _deliver_result must return None (treat + as local; output stays in last_output), not the "no delivery target + resolved" error string that previously fired on every run. + """ + + def _deliver(self, job, monkeypatch): + import cron.scheduler as sched + # No home channel for any platform → origin is unresolvable. + monkeypatch.setattr(sched, "_get_home_target_chat_id", lambda *_: "") + return _deliver_result(job, "CLI bulletin") + + def test_origin_with_no_home_channels_returns_none(self, monkeypatch): + job = {"id": "cli-job", "deliver": "origin", "origin": "cli-session-provenance"} + assert self._deliver(job, monkeypatch) is None + + def test_omitted_deliver_autodetect_returns_none(self, monkeypatch): + # deliver key present but None (auto-detect) previously errored with + # "no delivery target resolved for deliver=None". + job = {"id": "cli-job", "deliver": None, "origin": "cli-session-provenance"} + assert self._deliver(job, monkeypatch) is None + + def test_explicit_platform_with_no_channel_still_errors(self, monkeypatch): + # A concrete platform target that cannot resolve is still a real error + # (this must NOT be silently swallowed by the origin→local fallback). + job = {"id": "tg-job", "deliver": "telegram"} + result = self._deliver(job, monkeypatch) + assert result is not None + assert "no delivery target resolved" in result class TestSendMediaTimeoutCancelsFuture: diff --git a/tests/tools/test_cronjob_run_immediate.py b/tests/tools/test_cronjob_run_immediate.py new file mode 100644 index 000000000..9efa60e82 --- /dev/null +++ b/tests/tools/test_cronjob_run_immediate.py @@ -0,0 +1,81 @@ +"""Tests for cronjob action='run' immediate execution (#41037). + +Before this fix, `cronjob(action='run')` only set next_run_at=now and returned +success, relying on the scheduler ticker to actually run the job. With no +gateway/ticker active (e.g. a CLI-only Windows setup) the job never executed and +last_run_at stayed null forever. Now action='run' claims the job (at-most-once, +blocking a concurrent tick) and fires it inline via the shared run_one_job body. +""" +import json +from unittest.mock import patch + +from tools.cronjob_tools import cronjob, _execute_job_now + + +_JOB = {"id": "job-run-1", "name": "manual run", "prompt": "hi", + "schedule": {"kind": "cron", "expr": "0 9 * * *"}} + + +class TestCronjobRunExecutesImmediately: + def test_run_action_claims_and_fires_via_run_one_job(self): + """action='run' must claim the job then fire it through run_one_job.""" + ran = {"job": "after-run", "last_status": "ok", "last_error": None} + with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \ + patch("tools.cronjob_tools.claim_job_for_fire", return_value=True) as m_claim, \ + patch("cron.scheduler.run_one_job", return_value=True) as m_run, \ + patch("tools.cronjob_tools.get_job", return_value=ran): + out = json.loads(cronjob(action="run", job_id="job-run-1")) + + assert out["success"] is True + assert out["job"]["executed"] is True + assert out["job"]["execution_success"] is True + m_claim.assert_called_once_with("job-run-1") # at-most-once claim taken + m_run.assert_called_once() # fired via the shared body + + def test_run_skips_when_claim_lost(self): + """If the scheduler already holds the fire claim, do NOT double-run.""" + with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \ + patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \ + patch("cron.scheduler.run_one_job") as m_run, \ + patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)): + out = json.loads(cronjob(action="run", job_id="job-run-1")) + + assert out["success"] is True + assert out["job"]["executed"] is False + assert out["job"]["execution_success"] is False + assert "execution_skipped" in out["job"] + m_run.assert_not_called() # claim lost -> never fired + + def test_run_reports_failure_from_last_status(self): + """A failed run is reported via the re-read job's last_status/last_error.""" + failed = {"id": "job-run-1", "last_status": "error", "last_error": "provider 500"} + with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \ + patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \ + patch("cron.scheduler.run_one_job", return_value=True), \ + patch("tools.cronjob_tools.get_job", return_value=failed): + out = json.loads(cronjob(action="run", job_id="job-run-1")) + + assert out["job"]["executed"] is True + assert out["job"]["execution_success"] is False + assert out["job"]["execution_error"] == "provider 500" + + def test_execute_job_now_bails_without_claim(self): + """_execute_job_now never calls run_one_job when the claim is lost.""" + with patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \ + patch("cron.scheduler.run_one_job") as m_run: + res = _execute_job_now(dict(_JOB)) + assert res["claimed"] is False + assert res["success"] is False + m_run.assert_not_called() + + def test_execute_job_now_marks_failure_on_exception(self): + """An exception during fire is captured, marked failed, not propagated.""" + with patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \ + patch("cron.scheduler.run_one_job", side_effect=RuntimeError("boom")), \ + patch("tools.cronjob_tools.mark_job_run") as m_mark, \ + patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)): + res = _execute_job_now(dict(_JOB)) + assert res["claimed"] is True + assert res["success"] is False + assert "boom" in res["error"] + m_mark.assert_called_once() diff --git a/tests/tools/test_mcp_capability_gating.py b/tests/tools/test_mcp_capability_gating.py index 551af1340..95fddb110 100644 --- a/tests/tools/test_mcp_capability_gating.py +++ b/tests/tools/test_mcp_capability_gating.py @@ -254,6 +254,12 @@ def test_substring_fallback(self): from tools.mcp_tool import _is_method_not_found_error assert _is_method_not_found_error(Exception("Method not found")) is True + def test_unknown_method_phrasing_is_match(self): + # agentmemory's MCP server surfaces method-not-found as a plain + # "Unknown method: ping" string with no structural -32601 code (#50028). + from tools.mcp_tool import _is_method_not_found_error + assert _is_method_not_found_error(Exception("Unknown method: ping")) is True + def test_unrelated_exception_is_not_match(self): from tools.mcp_tool import _is_method_not_found_error assert _is_method_not_found_error(TimeoutError()) is False @@ -295,6 +301,23 @@ async def test_falls_back_to_list_tools_on_method_not_found(self): task.session.list_tools.assert_awaited_once() assert task._ping_unsupported is True + async def test_falls_back_on_unknown_method_string(self): + """Regression for #50028: a server that surfaces method-not-found as a + plain "Unknown method: ping" string (no structural -32601 code) must + still latch the fallback and use list_tools, NOT reconnect-loop.""" + task = MCPServerTask("test") + task.initialize_result = _caps(tools=SimpleNamespace()) + task.session = SimpleNamespace( + send_ping=AsyncMock(side_effect=Exception("Unknown method: ping")), + list_tools=AsyncMock(return_value=SimpleNamespace(tools=[])), + ) + + await task._keepalive_probe() + + task.session.send_ping.assert_awaited_once() + task.session.list_tools.assert_awaited_once() + assert task._ping_unsupported is True + async def test_latch_skips_ping_on_subsequent_cycles(self): task = MCPServerTask("test") task.initialize_result = _caps(tools=SimpleNamespace()) diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index a0bb7c12b..58dd3d77f 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -25,14 +25,16 @@ from cron.jobs import ( AmbiguousJobReference, + claim_job_for_fire, create_job, + get_job, list_jobs, + mark_job_run, parse_schedule, pause_job, remove_job, resolve_job_ref, resume_job, - trigger_job, update_job, ) @@ -744,6 +746,51 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: return result +def _execute_job_now(job: Dict[str, Any]) -> Dict[str, Any]: + """Execute a cron job immediately, outside the scheduler tick. + + Atomically claims the job first via ``claim_job_for_fire`` — the same + at-most-once CAS the scheduler/external-provider fire path uses — so a + concurrently-running gateway ticker cannot also fire it (the claim both + blocks a duplicate fire and advances ``next_run_at`` for recurring jobs). + If the claim is lost (another fire is in flight), this is a no-op. + + The actual firing is delegated to ``run_one_job`` — the single shared + execute→save→deliver→mark body the ticker and external providers use — so + failure delivery, ``[SILENT]`` handling, and live-adapter delivery stay + identical across paths and can't drift. + + Returns {"claimed": bool, "success": bool, "error": str|None}. + """ + job_id = job["id"] + try: + from cron.scheduler import run_one_job + + # At-most-once claim: bail without running if a tick/other fire owns it. + if not claim_job_for_fire(job_id): + return {"claimed": False, "success": False, + "error": "Job is already being fired by the scheduler; not run again."} + + # run_one_job records last_run_at/last_status via mark_job_run (which + # also clears the fire claim) and returns True iff it processed the job. + processed = run_one_job(job) + refreshed = get_job(job_id) or {} + ok = refreshed.get("last_status") == "ok" + return { + "claimed": True, + "success": bool(processed and ok), + "error": refreshed.get("last_error"), + } + + except Exception as e: + logger.error("Failed to execute cron job %s immediately: %s", job_id, e) + try: + mark_job_run(job_id, False, str(e)) + except Exception: + pass + return {"claimed": True, "success": False, "error": str(e)} + + def cronjob( action: str, job_id: Optional[str] = None, @@ -933,9 +980,24 @@ def cronjob( return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized in {"run", "run_now", "trigger"}: - updated = trigger_job(job_id) _reset_cron_failure(task_id) - return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + # Execute the job immediately rather than only scheduling it for the + # next scheduler tick — a manual `run` should actually run, even when + # no gateway/ticker is active (the #41037 case). The claim inside + # _execute_job_now advances next_run_at and blocks a concurrent tick + # from double-firing. + exec_result = _execute_job_now(job) + # Re-read so the response reflects the post-run last_run_at/last_status. + result = _format_job(get_job(job_id) or {"id": job_id}) + result["executed"] = exec_result.get("claimed", False) + result["execution_success"] = exec_result.get("success", False) + if not exec_result.get("claimed", False): + result["execution_skipped"] = ( + "Already being fired by the scheduler; not run again." + ) + elif exec_result.get("error"): + result["execution_error"] = exec_result["error"] + return json.dumps({"success": True, "job": result}, indent=2) if normalized == "update": updates: Dict[str, Any] = {} diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index a3e8de885..85317b30a 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -438,6 +438,13 @@ def _is_method_not_found_error(exc: BaseException) -> bool: an empty result. Structurally inspect ``McpError.error.code`` first, then fall back to a substring match so detection survives SDK version drift and servers that surface the condition as a plain message. + + The substring fallback matters when a server reports method-not-found + without a structural ``-32601`` code (e.g. surfaced as a plain exception + string). Besides the canonical "method not found", many JSON-RPC + implementations phrase it as "Unknown method: " — agentmemory's MCP + server is one such case (#50028). Without matching that phrasing the + ping→list_tools fallback never latches and the keepalive reconnect-loops. """ # Structural: mcp.shared.exceptions.McpError carries ErrorData.code. err = getattr(exc, "error", None) @@ -450,6 +457,7 @@ def _is_method_not_found_error(exc: BaseException) -> bool: return ( str(_JSONRPC_METHOD_NOT_FOUND) in msg or "method not found" in msg + or "unknown method" in msg or "not found: ping" in msg )