Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
=========

Changes in Version 4.17.0 (2026/XX/XX)
--------------------------------------

PyMongo 4.17 brings a number of changes including:

- Added the :meth:`~pymongo.asynchronous.client_session.AsyncClientSession.bind` and :meth:`~pymongo.client_session.ClientSession.bind` methods
that allow users to bind a session to all database operations within the scope of a context manager instead of having to explicitly pass the session to each individual operation.
See <PLACEHOLDER> for examples and more information.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<PLACEHOLDER> should be the MongoDB docs ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah once we have examples and such added to a page I'll update these spots.

Comment thread
NoahStapp marked this conversation as resolved.
Comment thread
NoahStapp marked this conversation as resolved.

Changes in Version 4.16.0 (2026/01/07)
--------------------------------------

Expand Down
28 changes: 27 additions & 1 deletion pymongo/asynchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import time
import uuid
from collections.abc import Mapping as _Mapping
from contextvars import ContextVar
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -153,6 +154,8 @@
TypeVar,
)

from _contextvars import Token
Comment thread
NoahStapp marked this conversation as resolved.
Outdated

from bson.binary import Binary
from bson.int64 import Int64
from bson.timestamp import Timestamp
Expand Down Expand Up @@ -181,6 +184,14 @@

_IS_SYNC = False

_SESSION: ContextVar[Optional[_AsyncBoundClientSession]] = ContextVar("SESSION", default=None)


class _AsyncBoundClientSession:
def __init__(self, session: AsyncClientSession, client_id: int):
self.session = session
self.client_id = client_id


class SessionOptions:
"""Options for a new :class:`AsyncClientSession`.
Expand Down Expand Up @@ -517,6 +528,9 @@ def __init__(
self._attached_to_cursor = False
# Should we leave the session alive when the cursor is closed?
self._leave_alive = False
# Is this session bound to a scope?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I had to look up what "scope" means in this context so maybe "# Is this session bound to a context manager scope?"

self._bound = False
self._session_token: Optional[Token[_AsyncBoundClientSession]] = None

async def end_session(self) -> None:
"""Finish this session. If a transaction has started, abort it.
Expand Down Expand Up @@ -547,11 +561,23 @@ def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")

def bind(self) -> AsyncClientSession:
self._bound = True
return self

Comment thread
NoahStapp marked this conversation as resolved.
Outdated
async def __aenter__(self) -> AsyncClientSession:
if self._bound:
bound_session = _AsyncBoundClientSession(self, id(self._client))
self._session_token = _SESSION.set(bound_session) # type: ignore[assignment]
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self._end_session(lock=True)
if self._session_token:
_SESSION.reset(self._session_token) # type: ignore[arg-type]
self._session_token = None
self._bound = False
else:
await self._end_session(lock=True)
Comment thread
NoahStapp marked this conversation as resolved.
Outdated

@property
def client(self) -> AsyncMongoClient[Any]:
Expand Down
30 changes: 23 additions & 7 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from pymongo.asynchronous import client_session, database, uri_parser
from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
from pymongo.asynchronous.client_session import _EmptyServerSession
from pymongo.asynchronous.client_session import _SESSION, _EmptyServerSession
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.settings import TopologySettings
from pymongo.asynchronous.topology import Topology, _ErrorContext
Expand Down Expand Up @@ -1408,7 +1408,8 @@ def start_session(
def _ensure_session(
self, session: Optional[AsyncClientSession] = None
) -> Optional[AsyncClientSession]:
"""If provided session is None, lend a temporary session."""
"""If provided session and bound session are None, lend a temporary session."""
session = session or self._get_bound_session()
if session:
return session

Expand Down Expand Up @@ -2267,11 +2268,14 @@ async def _tmp_session(
self, session: Optional[client_session.AsyncClientSession]
) -> AsyncGenerator[Optional[client_session.AsyncClientSession], None]:
"""If provided session is None, lend a temporary session."""
if session is not None:
if not isinstance(session, client_session.AsyncClientSession):
raise ValueError(
f"'session' argument must be an AsyncClientSession or None, not {type(session)}"
)
if session is not None and not isinstance(session, client_session.AsyncClientSession):
raise ValueError(
f"'session' argument must be an AsyncClientSession or None, not {type(session)}"
)

# Check for a bound session. If one exists, treat it as an explicitly passed session.
session = session or self._get_bound_session()
if session:
Comment thread
NoahStapp marked this conversation as resolved.
# Don't call end_session.
yield session
return
Expand Down Expand Up @@ -2301,6 +2305,18 @@ async def _process_response(
if session is not None:
session._process_response(reply)

def _get_bound_session(self) -> Optional[AsyncClientSession]:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is encapsulated in a separate utility function because cursor operations call _ensure_session directly, while most other database operations call _tmp_session instead. Separating out this check keeps the clarity of the current behavioral differences with minimal code duplication.

bound_session = _SESSION.get()
if bound_session:
if bound_session.client_id == id(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace this with session.client is self? Then we can get rid of the _AsyncBoundClientSession class altogether.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about 5edd9a0, this issue is addressed in that commit correct (and if so how) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shane's suggestion was implemented and the _AsyncBoundClientSession removed.

return bound_session.session
else:
raise InvalidOperation(
"Only the client that created the bound session can perform operations within its context block. See <PLACEHOLDER> for more information."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another <PLACEHOLDER> here … assuming that maybe these come out after merge?

Comment thread
NoahStapp marked this conversation as resolved.
)
else:
return None

async def server_info(
self, session: Optional[client_session.AsyncClientSession] = None
) -> dict[str, Any]:
Expand Down
28 changes: 27 additions & 1 deletion pymongo/synchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import time
import uuid
from collections.abc import Mapping as _Mapping
from contextvars import ContextVar
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -152,6 +153,8 @@
TypeVar,
)

from _contextvars import Token
Comment thread
NoahStapp marked this conversation as resolved.
Outdated

from bson.binary import Binary
from bson.int64 import Int64
from bson.timestamp import Timestamp
Expand Down Expand Up @@ -180,6 +183,14 @@

_IS_SYNC = True

_SESSION: ContextVar[Optional[_BoundClientSession]] = ContextVar("SESSION", default=None)


class _BoundClientSession:
def __init__(self, session: ClientSession, client_id: int):
self.session = session
self.client_id = client_id


class SessionOptions:
"""Options for a new :class:`ClientSession`.
Expand Down Expand Up @@ -516,6 +527,9 @@ def __init__(
self._attached_to_cursor = False
# Should we leave the session alive when the cursor is closed?
self._leave_alive = False
# Is this session bound to a scope?
self._bound = False
self._session_token: Optional[Token[_BoundClientSession]] = None

def end_session(self) -> None:
"""Finish this session. If a transaction has started, abort it.
Expand Down Expand Up @@ -546,11 +560,23 @@ def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")

def bind(self) -> ClientSession:
self._bound = True
return self

Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bind() mutates session state (self._bound = True) and returns self, which changes the meaning of using the session itself as a context manager. In particular, with client.start_session().bind() as s: (or calling s.bind() earlier and later doing with s:) will no longer end the session on exit, leaking server sessions and breaking the long-standing contract documented in this module. Consider making bind() return a dedicated context manager (separate object) that only sets/resets the bound-session ContextVar, while keeping ClientSession.__enter__/__exit__ semantics unchanged (always ending the session).

Suggested change
def bind(self) -> ClientSession:
self._bound = True
return self
class _BindContext(ContextManager["ClientSession"]):
"""Context manager used by ClientSession.bind().
Temporarily marks the session as bound so that __enter__/__exit__
manage the bound-session ContextVar, and then restores the previous
bound state on exit.
"""
def __init__(self, session: "ClientSession") -> None:
self._session = session
self._prev_bound = session._bound
def __enter__(self) -> "ClientSession":
# Mark the session as bound for the duration of this context and
# reuse ClientSession.__enter__ to set the ContextVar.
self._session._bound = True
return self._session.__enter__()
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional["TracebackType"],
) -> None:
try:
# Delegate to ClientSession.__exit__ to reset the ContextVar
# without ending the session when bound.
self._session.__exit__(exc_type, exc_val, exc_tb)
finally:
# Restore the previous bound state.
self._session._bound = self._prev_bound
def bind(self) -> ContextManager["ClientSession"]:
"""Return a context manager that binds this session to the current context.
Using ``with session.bind():`` will temporarily bind the session to the
bound-session ContextVar without permanently changing the session's
behavior when used as a context manager itself.
"""
return self._BindContext(self)

Copilot uses AI. Check for mistakes.
def __enter__(self) -> ClientSession:
if self._bound:
bound_session = _BoundClientSession(self, id(self._client))
self._session_token = _SESSION.set(bound_session) # type: ignore[assignment]
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self._end_session(lock=True)
if self._session_token:
_SESSION.reset(self._session_token) # type: ignore[arg-type]
self._session_token = None
self._bound = False
else:
self._end_session(lock=True)
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The binding implementation is not re-entrant for the same session: each __enter__ overwrites self._session_token, so nested with s.bind(): ... with s.bind(): ... will clear the token in the inner __exit__, and the outer __exit__ will fall through to _end_session(lock=True) unexpectedly. If bind() remains supported, store the token per-context (e.g., on a separate context manager instance or a stack) so nesting works reliably and never triggers _end_session from a bind scope.

Copilot uses AI. Check for mistakes.

@property
def client(self) -> MongoClient[Any]:
Expand Down
30 changes: 23 additions & 7 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
from pymongo.synchronous import client_session, database, uri_parser
from pymongo.synchronous.change_stream import ChangeStream, ClusterChangeStream
from pymongo.synchronous.client_bulk import _ClientBulk
from pymongo.synchronous.client_session import _EmptyServerSession
from pymongo.synchronous.client_session import _SESSION, _EmptyServerSession
from pymongo.synchronous.command_cursor import CommandCursor
from pymongo.synchronous.settings import TopologySettings
from pymongo.synchronous.topology import Topology, _ErrorContext
Expand Down Expand Up @@ -1406,7 +1406,8 @@ def start_session(
)

def _ensure_session(self, session: Optional[ClientSession] = None) -> Optional[ClientSession]:
"""If provided session is None, lend a temporary session."""
"""If provided session and bound session are None, lend a temporary session."""
session = session or self._get_bound_session()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what's going on here before the changes. How does:

if session:
    return session

lend a temporary session?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call _ensure_session(session) in a bunch of places where session is an actual explicit session if the user passed one to the operation. The former if session: return session check is to return that explicit session if it exists instead of lending a temporary one.

if session:
return session

Expand Down Expand Up @@ -2263,11 +2264,14 @@ def _tmp_session(
self, session: Optional[client_session.ClientSession]
) -> Generator[Optional[client_session.ClientSession], None]:
"""If provided session is None, lend a temporary session."""
if session is not None:
if not isinstance(session, client_session.ClientSession):
raise ValueError(
f"'session' argument must be a ClientSession or None, not {type(session)}"
)
if session is not None and not isinstance(session, client_session.ClientSession):
raise ValueError(
f"'session' argument must be a ClientSession or None, not {type(session)}"
)

# Check for a bound session. If one exists, treat it as an explicitly passed session.
session = session or self._get_bound_session()
if session:
Comment thread
NoahStapp marked this conversation as resolved.
# Don't call end_session.
yield session
return
Expand Down Expand Up @@ -2295,6 +2299,18 @@ def _process_response(self, reply: Mapping[str, Any], session: Optional[ClientSe
if session is not None:
session._process_response(reply)

def _get_bound_session(self) -> Optional[ClientSession]:
bound_session = _SESSION.get()
if bound_session:
if bound_session.client_id == id(self):
return bound_session.session
else:
raise InvalidOperation(
"Only the client that created the bound session can perform operations within its context block. See <PLACEHOLDER> for more information."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third <PLACEHOLDER> maybe synchro'd

Comment thread
NoahStapp marked this conversation as resolved.
)
else:
return None

def server_info(self, session: Optional[client_session.ClientSession] = None) -> dict[str, Any]:
"""Get information about the MongoDB server we're connected to.

Expand Down
97 changes: 97 additions & 0 deletions test/asynchronous/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,52 @@ async def _test_ops(self, client, *ops):
f"{f.__name__} did not return implicit session to pool",
)

# Explicit bound session
for f, args, kw in ops:
async with client.start_session() as s:
async with s.bind():
listener.reset()
s._materialize()
last_use = s._server_session.last_use
start = time.monotonic()
self.assertLessEqual(last_use, start)
# In case "f" modifies its inputs.
args = copy.copy(args)
kw = copy.copy(kw)
await f(*args, **kw)
self.assertGreaterEqual(len(listener.started_events), 1)
for event in listener.started_events:
self.assertIn(
"lsid",
event.command,
f"{f.__name__} sent no lsid with {event.command_name}",
)

self.assertEqual(
s.session_id,
event.command["lsid"],
f"{f.__name__} sent wrong lsid with {event.command_name}",
)

self.assertFalse(s.has_ended)

self.assertTrue(s.has_ended)
with self.assertRaisesRegex(InvalidOperation, "ended session"):
async with s.bind():
await f(*args, **kw)

# Test a session cannot be used on another client.
async with self.client2.start_session() as s:
async with s.bind():
# In case "f" modifies its inputs.
args = copy.copy(args)
kw = copy.copy(kw)
with self.assertRaisesRegex(
InvalidOperation,
"Only the client that created the bound session can perform operations within its context block",
):
await f(*args, **kw)

async def test_implicit_sessions_checkout(self):
# "To confirm that implicit sessions only allocate their server session after a
# successful connection checkout" test from Driver Sessions Spec.
Expand Down Expand Up @@ -825,6 +871,57 @@ async def test_session_not_copyable(self):
async with client.start_session() as s:
self.assertRaises(TypeError, lambda: copy.copy(s))

async def test_nested_session_binding(self):
coll = self.client.pymongo_test.test
await coll.insert_one({"x": 1})

session1 = self.client.start_session()
session2 = self.client.start_session()
session1._materialize()
session2._materialize()
try:
self.listener.reset()
# Uses implicit session
await coll.find_one()
implicit_lsid = self.listener.started_events[0].command.get("lsid")
self.assertIsNotNone(implicit_lsid)
self.assertNotEqual(implicit_lsid, session1.session_id)
self.assertNotEqual(implicit_lsid, session2.session_id)

async with session1.bind():
self.listener.reset()
# Uses bound session1
await coll.find_one()
session1_lsid = self.listener.started_events[0].command.get("lsid")
self.assertEqual(session1_lsid, session1.session_id)

async with session2.bind():
self.listener.reset()
# Uses bound session2
await coll.find_one()
session2_lsid = self.listener.started_events[0].command.get("lsid")
self.assertEqual(session2_lsid, session2.session_id)
self.assertNotEqual(session2_lsid, session1.session_id)

self.listener.reset()
# Use bound session1 again
await coll.find_one()
session1_lsid = self.listener.started_events[0].command.get("lsid")
self.assertEqual(session1_lsid, session1.session_id)
self.assertNotEqual(session1_lsid, session2.session_id)

self.listener.reset()
# Uses implicit session
await coll.find_one()
implicit_lsid = self.listener.started_events[0].command.get("lsid")
self.assertIsNotNone(implicit_lsid)
self.assertNotEqual(implicit_lsid, session1.session_id)
self.assertNotEqual(implicit_lsid, session2.session_id)

finally:
await session1.end_session()
await session2.end_session()


class TestCausalConsistency(AsyncUnitTest):
listener: SessionTestListener
Expand Down
Loading
Loading