-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Python 4542 - Improved sessions API #2712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
2c0e189
5c21939
969abb2
f241650
df3040d
d6b883b
13f1a15
5edd9a0
6eaf094
b0afe91
a1234ed
e00ac18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -139,6 +139,7 @@ | |
| import time | ||
| import uuid | ||
| from collections.abc import Mapping as _Mapping | ||
| from contextvars import ContextVar | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
|
|
@@ -153,6 +154,8 @@ | |
| TypeVar, | ||
| ) | ||
|
|
||
| from _contextvars import Token | ||
|
NoahStapp marked this conversation as resolved.
Outdated
|
||
|
|
||
| from bson.binary import Binary | ||
| from bson.int64 import Int64 | ||
| from bson.timestamp import Timestamp | ||
|
|
@@ -181,6 +184,14 @@ | |
|
|
||
| _IS_SYNC = False | ||
|
|
||
| _SESSION: ContextVar[Optional[_AsyncBoundClientSession]] = ContextVar("SESSION", default=None) | ||
|
|
||
|
|
||
| class _AsyncBoundClientSession: | ||
| def __init__(self, session: AsyncClientSession, client_id: int): | ||
| self.session = session | ||
| self.client_id = client_id | ||
|
|
||
|
|
||
| class SessionOptions: | ||
| """Options for a new :class:`AsyncClientSession`. | ||
|
|
@@ -517,6 +528,9 @@ def __init__( | |
| self._attached_to_cursor = False | ||
| # Should we leave the session alive when the cursor is closed? | ||
| self._leave_alive = False | ||
| # Is this session bound to a scope? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I had to look up what "scope" means in this context so maybe "# Is this session bound to a context manager scope?" |
||
| self._bound = False | ||
| self._session_token: Optional[Token[_AsyncBoundClientSession]] = None | ||
|
|
||
| async def end_session(self) -> None: | ||
| """Finish this session. If a transaction has started, abort it. | ||
|
|
@@ -547,11 +561,23 @@ def _check_ended(self) -> None: | |
| if self._server_session is None: | ||
| raise InvalidOperation("Cannot use ended session") | ||
|
|
||
| def bind(self) -> AsyncClientSession: | ||
| self._bound = True | ||
| return self | ||
|
|
||
|
NoahStapp marked this conversation as resolved.
Outdated
|
||
| async def __aenter__(self) -> AsyncClientSession: | ||
| if self._bound: | ||
| bound_session = _AsyncBoundClientSession(self, id(self._client)) | ||
| self._session_token = _SESSION.set(bound_session) # type: ignore[assignment] | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: | ||
| await self._end_session(lock=True) | ||
| if self._session_token: | ||
| _SESSION.reset(self._session_token) # type: ignore[arg-type] | ||
| self._session_token = None | ||
| self._bound = False | ||
| else: | ||
| await self._end_session(lock=True) | ||
|
NoahStapp marked this conversation as resolved.
Outdated
|
||
|
|
||
| @property | ||
| def client(self) -> AsyncMongoClient[Any]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,7 +65,7 @@ | |
| from pymongo.asynchronous import client_session, database, uri_parser | ||
| from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream | ||
| from pymongo.asynchronous.client_bulk import _AsyncClientBulk | ||
| from pymongo.asynchronous.client_session import _EmptyServerSession | ||
| from pymongo.asynchronous.client_session import _SESSION, _EmptyServerSession | ||
| from pymongo.asynchronous.command_cursor import AsyncCommandCursor | ||
| from pymongo.asynchronous.settings import TopologySettings | ||
| from pymongo.asynchronous.topology import Topology, _ErrorContext | ||
|
|
@@ -1408,7 +1408,8 @@ def start_session( | |
| def _ensure_session( | ||
| self, session: Optional[AsyncClientSession] = None | ||
| ) -> Optional[AsyncClientSession]: | ||
| """If provided session is None, lend a temporary session.""" | ||
| """If provided session and bound session are None, lend a temporary session.""" | ||
| session = session or self._get_bound_session() | ||
| if session: | ||
| return session | ||
|
|
||
|
|
@@ -2267,11 +2268,14 @@ async def _tmp_session( | |
| self, session: Optional[client_session.AsyncClientSession] | ||
| ) -> AsyncGenerator[Optional[client_session.AsyncClientSession], None]: | ||
| """If provided session is None, lend a temporary session.""" | ||
| if session is not None: | ||
| if not isinstance(session, client_session.AsyncClientSession): | ||
| raise ValueError( | ||
| f"'session' argument must be an AsyncClientSession or None, not {type(session)}" | ||
| ) | ||
| if session is not None and not isinstance(session, client_session.AsyncClientSession): | ||
| raise ValueError( | ||
| f"'session' argument must be an AsyncClientSession or None, not {type(session)}" | ||
| ) | ||
|
|
||
| # Check for a bound session. If one exists, treat it as an explicitly passed session. | ||
| session = session or self._get_bound_session() | ||
| if session: | ||
|
NoahStapp marked this conversation as resolved.
|
||
| # Don't call end_session. | ||
| yield session | ||
| return | ||
|
|
@@ -2301,6 +2305,18 @@ async def _process_response( | |
| if session is not None: | ||
| session._process_response(reply) | ||
|
|
||
| def _get_bound_session(self) -> Optional[AsyncClientSession]: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is encapsulated in a separate utility function because cursor operations call |
||
| bound_session = _SESSION.get() | ||
| if bound_session: | ||
| if bound_session.client_id == id(self): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we replace this with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question about 5edd9a0, this issue is addressed in that commit correct (and if so how) ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shane's suggestion was implemented and the |
||
| return bound_session.session | ||
| else: | ||
| raise InvalidOperation( | ||
| "Only the client that created the bound session can perform operations within its context block. See <PLACEHOLDER> for more information." | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another
NoahStapp marked this conversation as resolved.
|
||
| ) | ||
| else: | ||
| return None | ||
|
|
||
| async def server_info( | ||
| self, session: Optional[client_session.AsyncClientSession] = None | ||
| ) -> dict[str, Any]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -139,6 +139,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import uuid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from collections.abc import Mapping as _Mapping | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from contextvars import ContextVar | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TYPE_CHECKING, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Any, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -152,6 +153,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TypeVar, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from _contextvars import Token | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
NoahStapp marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from bson.binary import Binary | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from bson.int64 import Int64 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from bson.timestamp import Timestamp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -180,6 +183,14 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _IS_SYNC = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _SESSION: ContextVar[Optional[_BoundClientSession]] = ContextVar("SESSION", default=None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class _BoundClientSession: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__(self, session: ClientSession, client_id: int): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.session = session | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.client_id = client_id | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class SessionOptions: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Options for a new :class:`ClientSession`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -516,6 +527,9 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._attached_to_cursor = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Should we leave the session alive when the cursor is closed? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._leave_alive = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Is this session bound to a scope? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._bound = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._session_token: Optional[Token[_BoundClientSession]] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def end_session(self) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Finish this session. If a transaction has started, abort it. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -546,11 +560,23 @@ def _check_ended(self) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._server_session is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise InvalidOperation("Cannot use ended session") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def bind(self) -> ClientSession: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._bound = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def bind(self) -> ClientSession: | |
| self._bound = True | |
| return self | |
| class _BindContext(ContextManager["ClientSession"]): | |
| """Context manager used by ClientSession.bind(). | |
| Temporarily marks the session as bound so that __enter__/__exit__ | |
| manage the bound-session ContextVar, and then restores the previous | |
| bound state on exit. | |
| """ | |
| def __init__(self, session: "ClientSession") -> None: | |
| self._session = session | |
| self._prev_bound = session._bound | |
| def __enter__(self) -> "ClientSession": | |
| # Mark the session as bound for the duration of this context and | |
| # reuse ClientSession.__enter__ to set the ContextVar. | |
| self._session._bound = True | |
| return self._session.__enter__() | |
| def __exit__( | |
| self, | |
| exc_type: Optional[Type[BaseException]], | |
| exc_val: Optional[BaseException], | |
| exc_tb: Optional["TracebackType"], | |
| ) -> None: | |
| try: | |
| # Delegate to ClientSession.__exit__ to reset the ContextVar | |
| # without ending the session when bound. | |
| self._session.__exit__(exc_type, exc_val, exc_tb) | |
| finally: | |
| # Restore the previous bound state. | |
| self._session._bound = self._prev_bound | |
| def bind(self) -> ContextManager["ClientSession"]: | |
| """Return a context manager that binds this session to the current context. | |
| Using ``with session.bind():`` will temporarily bind the session to the | |
| bound-session ContextVar without permanently changing the session's | |
| behavior when used as a context manager itself. | |
| """ | |
| return self._BindContext(self) |
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The binding implementation is not re-entrant for the same session: each __enter__ overwrites self._session_token, so nested with s.bind(): ... with s.bind(): ... will clear the token in the inner __exit__, and the outer __exit__ will fall through to _end_session(lock=True) unexpectedly. If bind() remains supported, store the token per-context (e.g., on a separate context manager instance or a stack) so nesting works reliably and never triggers _end_session from a bind scope.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -108,7 +108,7 @@ | |
| from pymongo.synchronous import client_session, database, uri_parser | ||
| from pymongo.synchronous.change_stream import ChangeStream, ClusterChangeStream | ||
| from pymongo.synchronous.client_bulk import _ClientBulk | ||
| from pymongo.synchronous.client_session import _EmptyServerSession | ||
| from pymongo.synchronous.client_session import _SESSION, _EmptyServerSession | ||
| from pymongo.synchronous.command_cursor import CommandCursor | ||
| from pymongo.synchronous.settings import TopologySettings | ||
| from pymongo.synchronous.topology import Topology, _ErrorContext | ||
|
|
@@ -1406,7 +1406,8 @@ def start_session( | |
| ) | ||
|
|
||
| def _ensure_session(self, session: Optional[ClientSession] = None) -> Optional[ClientSession]: | ||
| """If provided session is None, lend a temporary session.""" | ||
| """If provided session and bound session are None, lend a temporary session.""" | ||
| session = session or self._get_bound_session() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand what's going on here before the changes. How does: lend a temporary session?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We call |
||
| if session: | ||
| return session | ||
|
|
||
|
|
@@ -2263,11 +2264,14 @@ def _tmp_session( | |
| self, session: Optional[client_session.ClientSession] | ||
| ) -> Generator[Optional[client_session.ClientSession], None]: | ||
| """If provided session is None, lend a temporary session.""" | ||
| if session is not None: | ||
| if not isinstance(session, client_session.ClientSession): | ||
| raise ValueError( | ||
| f"'session' argument must be a ClientSession or None, not {type(session)}" | ||
| ) | ||
| if session is not None and not isinstance(session, client_session.ClientSession): | ||
| raise ValueError( | ||
| f"'session' argument must be a ClientSession or None, not {type(session)}" | ||
| ) | ||
|
|
||
| # Check for a bound session. If one exists, treat it as an explicitly passed session. | ||
| session = session or self._get_bound_session() | ||
| if session: | ||
|
NoahStapp marked this conversation as resolved.
|
||
| # Don't call end_session. | ||
| yield session | ||
| return | ||
|
|
@@ -2295,6 +2299,18 @@ def _process_response(self, reply: Mapping[str, Any], session: Optional[ClientSe | |
| if session is not None: | ||
| session._process_response(reply) | ||
|
|
||
| def _get_bound_session(self) -> Optional[ClientSession]: | ||
| bound_session = _SESSION.get() | ||
| if bound_session: | ||
| if bound_session.client_id == id(self): | ||
| return bound_session.session | ||
| else: | ||
| raise InvalidOperation( | ||
| "Only the client that created the bound session can perform operations within its context block. See <PLACEHOLDER> for more information." | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Third
NoahStapp marked this conversation as resolved.
|
||
| ) | ||
| else: | ||
| return None | ||
|
|
||
| def server_info(self, session: Optional[client_session.ClientSession] = None) -> dict[str, Any]: | ||
| """Get information about the MongoDB server we're connected to. | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<PLACEHOLDER>should be the MongoDB docs ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah once we have examples and such added to a page I'll update these spots.