Skip to content

Commit 9f46dfe

Browse files
authored
Merge branch 'master' into spec-resync-06-22-2026
2 parents 4b5b9f8 + f65a451 commit 9f46dfe

26 files changed

Lines changed: 1516 additions & 1778 deletions

.evergreen/config.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ post:
4545
- func: "cleanup"
4646

4747
tasks:
48+
- name: linkcheck
49+
commands:
50+
- command: subprocess.exec
51+
type: test
52+
params:
53+
binary: bash
54+
args:
55+
- .evergreen/just.sh
56+
- docs-linkcheck
57+
working_dir: src
58+
4859
- name: resync_specs
4960
commands:
5061
- command: subprocess.exec
@@ -56,6 +67,14 @@ tasks:
5667
working_dir: src
5768

5869
buildvariants:
70+
- name: linkcheck
71+
display_name: "Link Check"
72+
run_on: rhel80-small
73+
cron: '0 0 * * *'
74+
patchable: true
75+
tasks:
76+
- name: linkcheck
77+
5978
- name: resync_specs
6079
display_name: "Resync Specs"
6180
run_on: rhel80-small

.github/workflows/test-python.yml

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ jobs:
4646
- name: Run typecheck
4747
run: |
4848
just typing
49+
- name: Run synchro smoke test
50+
run: |
51+
uv run --extra test --with unasync pytest tools/test_synchro.py -v --override-ini="addopts="
4952
- run: |
5053
sudo apt-get install -y cppcheck
5154
- run: |
@@ -107,7 +110,7 @@ jobs:
107110
- name: Generate xml report
108111
run: uv tool run --with "coverage[toml]" coverage xml
109112
- name: Upload test results to Codecov
110-
uses: codecov/codecov-action@e79a6962e0d4c0c17b229090214935d2e33f8354 # v6.0.1
113+
uses: codecov/codecov-action@fb8b3582c8e4def4969c97caa2f19720cb33a72f # v7.0.0
111114
with:
112115
token: ${{ secrets.CODECOV_TOKEN }}
113116
doctest:
@@ -154,27 +157,6 @@ jobs:
154157
- name: Build docs
155158
run: just docs
156159

157-
linkcheck:
158-
name: Link Check
159-
runs-on: ubuntu-latest
160-
steps:
161-
- uses: actions/checkout@v6.0.3
162-
with:
163-
persist-credentials: false
164-
- name: Install uv
165-
uses: astral-sh/setup-uv@fac544c07dec837d0ccb6301d7b5580bf5edae39 # v8.2.0
166-
with:
167-
enable-cache: true
168-
python-version: "3.10"
169-
- name: Install just
170-
run: uv tool install rust-just
171-
- name: Install dependencies
172-
run: just install
173-
- name: Build docs
174-
run: just docs-linkcheck
175-
env:
176-
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
177-
178160
typing:
179161
name: Typing Tests
180162
runs-on: ubuntu-latest

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ expansion.yml
3030
.evergreen/scripts/test-env.sh
3131
specifications/
3232
results.json
33+
.synchro-modified
3334
.evergreen/atlas_x509_dev_client_certificate.pem
3435

3536
# Lambda temp files

.pre-commit-config.yaml

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,36 @@ repos:
1818
exclude: .patch
1919
exclude_types: [json]
2020

21-
- repo: https://github.com/astral-sh/ruff-pre-commit
22-
# Ruff version.
23-
rev: v0.15.16
21+
- repo: local
2422
hooks:
2523
- id: ruff
26-
args: ["--fix", "--show-fixes"]
24+
name: ruff
25+
entry: uv run --group lint ruff check --fix --show-fixes
26+
language: system
27+
types_or: [python, pyi]
28+
require_serial: true
2729
- id: ruff-format
30+
name: ruff-format
31+
entry: uv run --group lint ruff format
32+
language: system
33+
types_or: [python, pyi]
34+
require_serial: true
2835

2936
- repo: local
3037
hooks:
3138
- id: synchro
3239
name: synchro
33-
entry: bash ./tools/synchro.sh
34-
language: python
40+
entry: uv run --group unasync ./tools/synchro.py
41+
language: system
42+
require_serial: true
43+
always_run: true
44+
- id: synchro-stage-files
45+
name: synchro-stage-files
46+
entry: bash -c 'if [ -s .synchro-modified ]; then xargs git add < .synchro-modified || true; fi'
47+
language: system
3548
require_serial: true
36-
fail_fast: true
37-
additional_dependencies:
38-
- ruff==0.15.16
39-
- unasync
49+
always_run: true
50+
pass_filenames: false
4051

4152
- repo: https://github.com/adamchainz/blacken-docs
4253
rev: "1.16.0"

doc/conf.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,8 @@
9898
linkcheck_workers = 1
9999
# Give slow redirects more time before retrying.
100100
linkcheck_timeout = 60
101-
102-
# Ignore anchors in links since they may not be added to the page right away.
103-
linkcheck_anchors_ignore_for_url = [r"https://github.com/.*"]
101+
# Ignore anchors in links
102+
linkcheck_anchors = False
104103

105104
# Pass GitHub token to avoid rate-limiting on GitHub links.
106105
if github_token := os.environ.get("GITHUB_TOKEN"):

pymongo/asynchronous/bulk.py

Lines changed: 30 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
from __future__ import annotations
2121

2222
import copy
23-
import datetime
24-
import logging
2523
from collections.abc import Iterator, Mapping, MutableMapping
2624
from itertools import islice
2725
from typing import (
@@ -34,9 +32,9 @@
3432
from bson.objectid import ObjectId
3533
from bson.raw_bson import RawBSONDocument
3634
from pymongo import _csot, common
37-
from pymongo.asynchronous.client_session import (
38-
AsyncClientSession,
39-
_validate_session_write_concern,
35+
from pymongo.asynchronous.client_session import AsyncClientSession, _validate_session_write_concern
36+
from pymongo.asynchronous.command_runner import (
37+
run_bulk_write_command,
4038
)
4139
from pymongo.asynchronous.helpers import _handle_reauth
4240
from pymongo.bulk_shared import (
@@ -54,18 +52,14 @@
5452
from pymongo.errors import (
5553
ConfigurationError,
5654
InvalidOperation,
57-
NotPrimaryError,
5855
OperationFailure,
5956
)
6057
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
61-
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
6258
from pymongo.message import (
6359
_DELETE,
6460
_INSERT,
6561
_UPDATE,
6662
_BulkWriteContext,
67-
_convert_exception,
68-
_convert_write_result,
6963
_EncryptedBulkWriteContext,
7064
_randint,
7165
)
@@ -251,83 +245,16 @@ async def write_command(
251245
docs: list[Mapping[str, Any]],
252246
client: AsyncMongoClient[Any],
253247
) -> dict[str, Any]:
254-
"""A proxy for SocketInfo.write_command that handles event publishing."""
248+
"""Run a batch write command, returning the response as a dict."""
255249
cmd[bwc.field] = docs
256-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
257-
_debug_log(
258-
_COMMAND_LOGGER,
259-
message=_CommandStatusMessage.STARTED,
260-
clientId=client._topology_settings._topology_id,
261-
command=cmd,
262-
commandName=next(iter(cmd)),
263-
databaseName=bwc.db_name,
264-
requestId=request_id,
265-
operationId=request_id,
266-
driverConnectionId=bwc.conn.id,
267-
serverConnectionId=bwc.conn.server_connection_id,
268-
serverHost=bwc.conn.address[0],
269-
serverPort=bwc.conn.address[1],
270-
serviceId=bwc.conn.service_id,
271-
)
272-
if bwc.publish:
273-
bwc._start(cmd, request_id, docs)
274-
try:
275-
if bwc.session is not None and bwc.session._starting_transaction:
276-
bwc.session._transaction.set_in_progress()
277-
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
278-
duration = datetime.datetime.now() - bwc.start_time
279-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
280-
_debug_log(
281-
_COMMAND_LOGGER,
282-
message=_CommandStatusMessage.SUCCEEDED,
283-
clientId=client._topology_settings._topology_id,
284-
durationMS=duration,
285-
reply=reply,
286-
commandName=next(iter(cmd)),
287-
databaseName=bwc.db_name,
288-
requestId=request_id,
289-
operationId=request_id,
290-
driverConnectionId=bwc.conn.id,
291-
serverConnectionId=bwc.conn.server_connection_id,
292-
serverHost=bwc.conn.address[0],
293-
serverPort=bwc.conn.address[1],
294-
serviceId=bwc.conn.service_id,
295-
)
296-
if bwc.publish:
297-
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
298-
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
299-
except Exception as exc:
300-
duration = datetime.datetime.now() - bwc.start_time
301-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
302-
failure: _DocumentOut = exc.details # type: ignore[assignment]
303-
else:
304-
failure = _convert_exception(exc)
305-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
306-
_debug_log(
307-
_COMMAND_LOGGER,
308-
message=_CommandStatusMessage.FAILED,
309-
clientId=client._topology_settings._topology_id,
310-
durationMS=duration,
311-
failure=failure,
312-
commandName=next(iter(cmd)),
313-
databaseName=bwc.db_name,
314-
requestId=request_id,
315-
operationId=request_id,
316-
driverConnectionId=bwc.conn.id,
317-
serverConnectionId=bwc.conn.server_connection_id,
318-
serverHost=bwc.conn.address[0],
319-
serverPort=bwc.conn.address[1],
320-
serviceId=bwc.conn.service_id,
321-
isServerSideError=isinstance(exc, OperationFailure),
322-
)
323-
324-
if bwc.publish:
325-
bwc._fail(request_id, failure, duration)
326-
# Process the response from the server.
327-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
328-
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
329-
raise
330-
return reply # type: ignore[return-value]
250+
result_docs, _, _ = await run_bulk_write_command(
251+
bwc,
252+
cmd,
253+
request_id,
254+
msg,
255+
client=client,
256+
)
257+
return result_docs[0]
331258

332259
async def unack_write(
333260
self,
@@ -339,83 +266,23 @@ async def unack_write(
339266
docs: list[Mapping[str, Any]],
340267
client: AsyncMongoClient[Any],
341268
) -> Optional[Mapping[str, Any]]:
342-
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
343-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
344-
_debug_log(
345-
_COMMAND_LOGGER,
346-
message=_CommandStatusMessage.STARTED,
347-
clientId=client._topology_settings._topology_id,
348-
command=cmd,
349-
commandName=next(iter(cmd)),
350-
databaseName=bwc.db_name,
351-
requestId=request_id,
352-
operationId=request_id,
353-
driverConnectionId=bwc.conn.id,
354-
serverConnectionId=bwc.conn.server_connection_id,
355-
serverHost=bwc.conn.address[0],
356-
serverPort=bwc.conn.address[1],
357-
serviceId=bwc.conn.service_id,
358-
)
359-
if bwc.publish:
360-
cmd = bwc._start(cmd, request_id, docs)
361-
try:
362-
result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override]
363-
duration = datetime.datetime.now() - bwc.start_time
364-
if result is not None:
365-
reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type]
366-
else:
367-
# Comply with APM spec.
368-
reply = {"ok": 1}
369-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
370-
_debug_log(
371-
_COMMAND_LOGGER,
372-
message=_CommandStatusMessage.SUCCEEDED,
373-
clientId=client._topology_settings._topology_id,
374-
durationMS=duration,
375-
reply=reply,
376-
commandName=next(iter(cmd)),
377-
databaseName=bwc.db_name,
378-
requestId=request_id,
379-
operationId=request_id,
380-
driverConnectionId=bwc.conn.id,
381-
serverConnectionId=bwc.conn.server_connection_id,
382-
serverHost=bwc.conn.address[0],
383-
serverPort=bwc.conn.address[1],
384-
serviceId=bwc.conn.service_id,
385-
)
386-
if bwc.publish:
387-
bwc._succeed(request_id, reply, duration)
388-
except Exception as exc:
389-
duration = datetime.datetime.now() - bwc.start_time
390-
if isinstance(exc, OperationFailure):
391-
failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type]
392-
elif isinstance(exc, NotPrimaryError):
393-
failure = exc.details # type: ignore[assignment]
394-
else:
395-
failure = _convert_exception(exc)
396-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
397-
_debug_log(
398-
_COMMAND_LOGGER,
399-
message=_CommandStatusMessage.FAILED,
400-
clientId=client._topology_settings._topology_id,
401-
durationMS=duration,
402-
failure=failure,
403-
commandName=next(iter(cmd)),
404-
databaseName=bwc.db_name,
405-
requestId=request_id,
406-
operationId=request_id,
407-
driverConnectionId=bwc.conn.id,
408-
serverConnectionId=bwc.conn.server_connection_id,
409-
serverHost=bwc.conn.address[0],
410-
serverPort=bwc.conn.address[1],
411-
serviceId=bwc.conn.service_id,
412-
isServerSideError=isinstance(exc, OperationFailure),
413-
)
414-
if bwc.publish:
415-
assert bwc.start_time is not None
416-
bwc._fail(request_id, failure, duration)
417-
raise
418-
return result # type: ignore[return-value]
269+
"""Send an unacknowledged batch write command."""
270+
# Historically the STARTED log omits the documents while the published
271+
# CommandStartedEvent includes them, so log ``cmd`` but publish a copy
272+
# carrying the ``docs`` field.
273+
published = dict(cmd)
274+
published[bwc.field] = docs
275+
await run_bulk_write_command(
276+
bwc,
277+
cmd,
278+
request_id,
279+
msg,
280+
client=client,
281+
orig=published,
282+
max_doc_size=max_doc_size,
283+
unacknowledged=True,
284+
)
285+
return None
419286

420287
async def _execute_batch_unack(
421288
self,
@@ -487,7 +354,7 @@ async def _execute_command(
487354
run = self.current_run
488355

489356
# AsyncConnection.command validates the session, but we use
490-
# AsyncConnection.write_command
357+
# run_bulk_write_command.
491358
conn.validate_session(client, session)
492359
last_run = False
493360

0 commit comments

Comments
 (0)