Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 53 additions & 14 deletions src/canton_mcp_server/canton_billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ async def _query_active_contracts_via_updates(
party: str,
extract: Callable[[str, dict, str], Optional[_T]],
stop_when: Optional[Callable[[_T], bool]] = None,
page_size: int = 1000,
page_size: int = 100,
Comment thread
sqhell marked this conversation as resolved.
end_offset: Optional[int] = None,
) -> list[_T]:
"""
Expand Down Expand Up @@ -780,7 +780,59 @@ async def _query_active_contracts_via_updates(
break

max_offset_this_page = begin

def _item_offset(item: dict) -> Optional[int]:
"""Return the offset for any update variant (Transaction,
OffsetCheckpoint, Reassignment, TopologyTransaction, ...) so the
cursor advances even on pages that contain no Transactions. If we
only looked at Transactions, a page of pure OffsetCheckpoints
would leave `max_offset_this_page == begin` and the outer loop
would break prematurely — missing all later transactions.
"""
upd = item.get("update", {}) if isinstance(item, dict) else {}
if not isinstance(upd, dict):
return None
# Try every known variant wrapper, both capitalized and flat.
for variant in (
"Transaction",
"OffsetCheckpoint",
"Reassignment",
"TopologyTransaction",
):
wrapper = upd.get(variant)
if isinstance(wrapper, dict):
value = wrapper.get("value")
if isinstance(value, dict) and value.get("offset") is not None:
return _coerce_offset(value.get("offset"))
for variant in (
"transaction",
"offsetCheckpoint",
"reassignment",
"topologyTransaction",
):
value = upd.get(variant)
if isinstance(value, dict) and value.get("offset") is not None:
return _coerce_offset(value.get("offset"))
return None

def _coerce_offset(raw) -> Optional[int]:
if isinstance(raw, (int, float)):
return int(raw)
if isinstance(raw, str):
try:
return int(raw)
except ValueError:
return None
return None

for item in items:
# Always try to advance the page cursor — regardless of whether
# the item is a Transaction. This is the fix for checkpoint-only
# pages that would otherwise terminate the walk prematurely.
item_off = _item_offset(item)
if item_off is not None and item_off > max_offset_this_page:
max_offset_this_page = item_off

update = item.get("update", {}) if isinstance(item, dict) else {}
tx_wrapper = update.get("Transaction") or {}
tx = tx_wrapper.get("value") if isinstance(tx_wrapper, dict) else None
Expand All @@ -789,19 +841,6 @@ async def _query_active_contracts_via_updates(
if not tx:
continue

raw_offset = tx.get("offset")
if isinstance(raw_offset, str):
try:
tx_offset = int(raw_offset)
except ValueError:
tx_offset = max_offset_this_page
elif isinstance(raw_offset, (int, float)):
tx_offset = int(raw_offset)
else:
tx_offset = max_offset_this_page
if tx_offset > max_offset_this_page:
max_offset_this_page = tx_offset

for ev in tx.get("events", []) or []:
created = ev.get("CreatedEvent") or ev.get("createdEvent")
if created:
Expand Down
Loading