Skip to content

Commit 93dcbee

Browse files
zxqfd555Manul from Pathway
authored andcommitted
snapshot mode in mongodb (#9805)
GitOrigin-RevId: 8f614c5074563e2a59badafd6a6d40df4394ad76
1 parent 4e3f613 commit 93dcbee

12 files changed

Lines changed: 511 additions & 152 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
### Added
99
- `pw.io.kafka.read` and `pw.io.kafka.write` connectors now support OAUTHBEARER authentication.
10+
- `pw.io.mongodb.write` connector now supports an `output_table_type` parameter with two modes: `stream_of_changes` (default) and `snapshot`. In `snapshot` mode, the connector maintains the current state of the Pathway table in MongoDB using the `_id` field as the primary key, while `stream_of_changes` preserves the existing behavior by writing all events with `time` and `diff` flags to reflect transactional minibatches and the nature of each change.
11+
1012

1113
## [0.29.0] - 2026-01-22
1214

Lines changed: 149 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,54 @@
11
import json
2+
import pathlib
3+
from typing import Literal
24

3-
from utils import MONGODB_BASE_NAME, MONGODB_CONNECTION_STRING
5+
from utils import MONGODB_BASE_NAME, MONGODB_CONNECTION_STRING, MongoDBContext
46

57
import pathway as pw
68
from pathway.internals.parse_graph import G
79

810

9-
def test_mongodb(tmp_path, mongodb):
11+
def write_items_with_connector(
12+
*,
13+
mongodb: MongoDBContext,
14+
test_items: list[dict],
15+
input_path: pathlib.Path,
16+
schema: type[pw.Schema],
17+
output_collection: str,
18+
output_table_type: Literal["stream_of_changes", "snapshot"],
19+
persistence_config: pw.persistence.Config | None = None,
20+
) -> list[dict]:
21+
G.clear()
22+
with open(input_path, "w") as f:
23+
for test_item in test_items:
24+
f.write(json.dumps(test_item) + "\n")
25+
table = pw.io.jsonlines.read(input_path, schema=schema, mode="static")
26+
pw.io.mongodb.write(
27+
table,
28+
connection_string=MONGODB_CONNECTION_STRING,
29+
database=MONGODB_BASE_NAME,
30+
collection=output_collection,
31+
output_table_type=output_table_type,
32+
)
33+
pw.run(persistence_config=persistence_config)
34+
35+
result = mongodb.get_collection(output_collection, schema.column_names())
36+
result.sort(key=lambda item: (item["name"], item["available"]))
37+
return result
38+
39+
40+
def check_special_fields(
41+
mongodb: MongoDBContext, output_collection: str, *, are_expected: bool
42+
):
43+
full_collection = mongodb.get_full_collection(output_collection)
44+
for document in full_collection:
45+
time_in_document = "time" in document
46+
diff_in_document = "diff" in document
47+
assert time_in_document == are_expected, document
48+
assert diff_in_document == are_expected, document
49+
50+
51+
def test_mongodb_stream_of_changes(tmp_path, mongodb):
1052
class InputSchema(pw.Schema):
1153
name: str
1254
count: int
@@ -15,39 +57,122 @@ class InputSchema(pw.Schema):
1557

1658
input_path = tmp_path / "input.txt"
1759
output_collection = mongodb.generate_collection_name()
60+
test_items = [
61+
{"name": "Milk", "count": 500, "price": 1.5, "available": False},
62+
{"name": "Water", "count": 600, "price": 0.5, "available": True},
63+
]
64+
result = write_items_with_connector(
65+
mongodb=mongodb,
66+
test_items=test_items,
67+
input_path=input_path,
68+
schema=InputSchema,
69+
output_collection=output_collection,
70+
output_table_type="stream_of_changes",
71+
)
72+
assert result == test_items
73+
check_special_fields(mongodb, output_collection, are_expected=True)
74+
75+
new_test_items = [{"name": "Milk", "count": 500, "price": 1.5, "available": True}]
76+
result = write_items_with_connector(
77+
mongodb=mongodb,
78+
test_items=new_test_items,
79+
input_path=input_path,
80+
schema=InputSchema,
81+
output_collection=output_collection,
82+
output_table_type="stream_of_changes",
83+
)
84+
expected_result = [
85+
{"name": "Milk", "count": 500, "price": 1.5, "available": False},
86+
{"name": "Milk", "count": 500, "price": 1.5, "available": True},
87+
{"name": "Water", "count": 600, "price": 0.5, "available": True},
88+
]
89+
assert result == expected_result
90+
check_special_fields(mongodb, output_collection, are_expected=True)
91+
1892

19-
def run(test_items: list[dict]) -> None:
20-
G.clear()
21-
with open(input_path, "w") as f:
22-
for test_item in test_items:
23-
f.write(json.dumps(test_item) + "\n")
24-
table = pw.io.jsonlines.read(input_path, schema=InputSchema, mode="static")
25-
pw.io.mongodb.write(
26-
table,
27-
connection_string=MONGODB_CONNECTION_STRING,
28-
database=MONGODB_BASE_NAME,
29-
collection=output_collection,
30-
)
31-
pw.run()
93+
def test_mongodb_snapshot(tmp_path, mongodb):
94+
class InputSchema(pw.Schema):
95+
name: str = pw.column_definition(primary_key=True)
96+
count: int
97+
price: float
98+
available: bool
99+
100+
input_path = tmp_path / "input.txt"
101+
output_collection = mongodb.generate_collection_name()
32102

33103
test_items = [
34104
{"name": "Milk", "count": 500, "price": 1.5, "available": False},
35105
{"name": "Water", "count": 600, "price": 0.5, "available": True},
36106
]
37-
run(test_items)
38-
39-
result = mongodb.get_collection(output_collection, InputSchema.column_names())
40-
result.sort(key=lambda item: (item["name"], item["available"]))
107+
result = write_items_with_connector(
108+
mongodb=mongodb,
109+
test_items=test_items,
110+
input_path=input_path,
111+
schema=InputSchema,
112+
output_collection=output_collection,
113+
output_table_type="snapshot",
114+
)
41115
assert result == test_items
116+
check_special_fields(mongodb, output_collection, are_expected=False)
42117

43118
new_test_items = [{"name": "Milk", "count": 500, "price": 1.5, "available": True}]
44-
run(new_test_items)
45-
46-
result = mongodb.get_collection(output_collection, InputSchema.column_names())
47-
result.sort(key=lambda item: (item["name"], item["available"]))
119+
result = write_items_with_connector(
120+
mongodb=mongodb,
121+
test_items=new_test_items,
122+
input_path=input_path,
123+
schema=InputSchema,
124+
output_collection=output_collection,
125+
output_table_type="snapshot",
126+
)
48127
expected_result = [
49-
{"name": "Milk", "count": 500, "price": 1.5, "available": False},
50128
{"name": "Milk", "count": 500, "price": 1.5, "available": True},
51129
{"name": "Water", "count": 600, "price": 0.5, "available": True},
52130
]
53131
assert result == expected_result
132+
check_special_fields(mongodb, output_collection, are_expected=False)
133+
134+
135+
def test_mongodb_snapshot_remove(tmp_path, mongodb):
136+
class InputSchema(pw.Schema):
137+
name: str = pw.column_definition(primary_key=True)
138+
count: int
139+
price: float
140+
available: bool
141+
142+
input_path = tmp_path / "input.txt"
143+
pstorage_path = tmp_path / "PStorage"
144+
persistence_config = pw.persistence.Config(
145+
backend=pw.persistence.Backend.filesystem(pstorage_path)
146+
)
147+
148+
output_collection = mongodb.generate_collection_name()
149+
test_items = [
150+
{"name": "Milk", "count": 500, "price": 1.5, "available": False},
151+
{"name": "Water", "count": 600, "price": 0.5, "available": True},
152+
]
153+
result = write_items_with_connector(
154+
mongodb=mongodb,
155+
test_items=test_items,
156+
input_path=input_path,
157+
schema=InputSchema,
158+
output_collection=output_collection,
159+
output_table_type="snapshot",
160+
persistence_config=persistence_config,
161+
)
162+
assert result == test_items
163+
check_special_fields(mongodb, output_collection, are_expected=False)
164+
165+
test_items = [
166+
{"name": "Water", "count": 600, "price": 0.5, "available": True},
167+
]
168+
result = write_items_with_connector(
169+
mongodb=mongodb,
170+
test_items=test_items,
171+
input_path=input_path,
172+
schema=InputSchema,
173+
output_collection=output_collection,
174+
output_table_type="snapshot",
175+
persistence_config=persistence_config,
176+
)
177+
assert result == test_items
178+
check_special_fields(mongodb, output_collection, are_expected=False)

integration_tests/db_connectors/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,13 @@ def generate_collection_name(self) -> str:
256256
table_name = f'mongodb_{str(uuid.uuid4()).replace("-", "")}'
257257
return table_name
258258

259+
def get_full_collection(
260+
self, collection_name: str
261+
) -> list[dict[str, str | int | bool | float]]:
262+
db = self.client[MONGODB_BASE_NAME]
263+
collection = db[collection_name]
264+
return [i for i in collection.find()] # cast to list
265+
259266
def get_collection(
260267
self, collection_name: str, field_names: list[str]
261268
) -> list[dict[str, str | int | bool | float]]:

python/pathway/io/_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
STATIC_MODE_NAME = "static"
2929
STREAMING_MODE_NAME = "streaming"
3030
SNAPSHOT_MODE_NAME = "streaming_with_deletions" # deprecated
31+
SNAPSHOT_OUTPUT_TABLE_TYPE = "snapshot"
3132

3233
METADATA_COLUMN_NAME = "_metadata"
3334
MESSAGE_QUEUE_KEY_COLUMN_NAME = "key"

python/pathway/io/deltalake/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from pathway.internals.table_io import table_from_datasource
2626
from pathway.internals.trace import trace_user_frame
2727
from pathway.io._utils import (
28+
SNAPSHOT_OUTPUT_TABLE_TYPE,
2829
_get_unique_name,
2930
_prepare_s3_connection_settings,
3031
internal_connector_mode,
@@ -34,7 +35,6 @@
3435
from pathway.io.s3 import DigitalOceanS3Settings, WasabiS3Settings
3536

3637
_PATHWAY_COLUMN_META_FIELD = "pathway.column.metadata"
37-
_SNAPSHOT_OUTPUT_TABLE_TYPE = "snapshot"
3838
_DELTA_LOG_REL_PATH = "_delta_log"
3939
_LAST_CHECKPOINT_BLOCK_NAME = "_last_checkpoint"
4040
_CHECKPOINT_EXTENSION = ".checkpoint.parquet"
@@ -618,7 +618,7 @@ def write(
618618
),
619619
min_commit_frequency=min_commit_frequency,
620620
partition_columns=prepared_partition_columns,
621-
snapshot_maintenance_on_output=output_table_type == _SNAPSHOT_OUTPUT_TABLE_TYPE,
621+
snapshot_maintenance_on_output=output_table_type == SNAPSHOT_OUTPUT_TABLE_TYPE,
622622
delta_optimizer_rule=(table_optimizer.engine_rule if table_optimizer else None),
623623
)
624624
data_format = api.DataFormat(

python/pathway/io/mongodb/__init__.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
from __future__ import annotations
44

5-
from typing import Iterable
5+
from typing import Iterable, Literal
66

77
from pathway.internals import api, datasink
88
from pathway.internals._io_helpers import _format_output_value_fields
99
from pathway.internals.expression import ColumnReference
1010
from pathway.internals.runtime_type_check import check_arg_types
1111
from pathway.internals.table import Table
1212
from pathway.internals.trace import trace_user_frame
13+
from pathway.io._utils import SNAPSHOT_OUTPUT_TABLE_TYPE
1314

1415

1516
@check_arg_types
@@ -20,20 +21,35 @@ def write(
2021
connection_string: str,
2122
database: str,
2223
collection: str,
24+
output_table_type: Literal["stream_of_changes", "snapshot"] = "stream_of_changes",
2325
max_batch_size: int | None = None,
2426
name: str | None = None,
2527
sort_by: Iterable[ColumnReference] | None = None,
2628
) -> None:
27-
"""Writes ``table``'s stream of updates to a MongoDB table.
29+
"""Writes ``table`` to a MongoDB table.
30+
31+
The output table supports two formats, controlled by the ``output_table_type``
32+
parameter.
33+
34+
The ``stream_of_changes`` format provides a complete history of all modifications
35+
applied to the table. Each entry contains the full data row along with two
36+
additional fields: ``time`` and ``diff``. The ``time`` field identifies the
37+
transactional minibatch in which the change occurred, while ``diff`` describes
38+
the nature of the change: ``diff = 1`` indicates that the row was inserted into
39+
the Pathway table, and ``diff = -1`` indicates that the row was removed. Row
40+
updates are represented as two events within the same transactional minibatch:
41+
first the old version of the row with ``diff = -1``, followed by the new version
42+
with ``diff = 1``. This format is used by default.
43+
44+
The ``snapshot`` format maintains the current state of the Pathway table in the
45+
output. The table's primary key is stored in the ``_id`` field. When a change
46+
occurs, no additional metadata fields are added; instead, the engine locates the
47+
corresponding row by ``_id`` and applies the update directly. As a result, the
48+
output table always reflects the latest state of the Pathway table.
2849
2950
If the specified database or table doesn't exist, it will be created during the
3051
first write.
3152
32-
The entries in the resulting table will have two additional fields: ``time``
33-
and ``diff``. In particular, ``time`` is a processing time of a row
34-
and ``diff`` shows the nature of the change: ``1`` means a row was added and ``-1``
35-
means a row was deleted.
36-
3753
**Note:** Since MongoDB
3854
`stores DateTime in milliseconds <https://www.mongodb.com/docs/manual/reference/bson-types/#date>`_,
3955
the `Duration </developers/api-docs/pathway/#pathway.Duration>`_ type is also
@@ -46,6 +62,8 @@ def write(
4662
for the details.
4763
database: The name of the database to update.
4864
collection: The name of the collection to write to.
65+
output_table_type: The type of the output table, defining whether a current snapshot
66+
or a history of modifications must be maintained.
4967
max_batch_size: The maximum number of entries to insert in one batch.
5068
name: A unique name for the connector. If provided, this name will be used in
5169
logs and monitoring dashboards.
@@ -180,25 +198,68 @@ def write(
180198
For more advanced setups, such as replica sets, authentication, or custom
181199
read/write concerns, refer to the official MongoDB documentation on
182200
`connection strings <https://www.mongodb.com/docs/manual/reference/connection-string/>`_
201+
202+
Note that if you do not need the full history of modifications, you can use the
203+
``snapshot`` output table type. In this case, the connector configuration would
204+
look as follows:
205+
206+
>>> pw.io.mongodb.write(
207+
... pet_owners,
208+
... connection_string="mongodb://127.0.0.1:27017/",
209+
... database="pathway-test",
210+
... collection="pet-owners",
211+
... output_table_type="snapshot",
212+
... )
213+
214+
The resulting output will be as follows:
215+
216+
.. code-block:: rst
217+
218+
[
219+
{
220+
_id: ObjectId('67180150d94db90697c07853'),
221+
age: Long('9'),
222+
owner: 'Bob',
223+
pet: 'cat',
224+
},
225+
{
226+
_id: ObjectId('67180150d94db90697c07854'),
227+
age: Long('8'),
228+
owner: 'Alice',
229+
pet: 'cat',
230+
},
231+
{
232+
_id: ObjectId('67180150d94db90697c07855'),
233+
age: Long('10'),
234+
owner: 'Alice',
235+
pet: 'dog',
236+
}
237+
]
183238
"""
239+
is_snapshot_mode = output_table_type == SNAPSHOT_OUTPUT_TABLE_TYPE
184240
data_storage = api.DataStorage(
185241
storage_type="mongodb",
186242
connection_string=connection_string,
187243
database=database,
188244
table_name=collection,
189245
max_batch_size=max_batch_size,
246+
snapshot_maintenance_on_output=is_snapshot_mode,
190247
)
191248
data_format = api.DataFormat(
192249
format_type="bson",
193250
key_field_names=[],
194251
value_fields=_format_output_value_fields(table),
252+
with_special_fields=not is_snapshot_mode,
195253
)
196254

255+
datasink_type = (
256+
"snapshot" if output_table_type == SNAPSHOT_OUTPUT_TABLE_TYPE else "sink"
257+
)
197258
table.to(
198259
datasink.GenericDataSink(
199260
data_storage,
200261
data_format,
201-
datasink_name="mongodb.sink",
262+
datasink_name=f"mongodb.{datasink_type}",
202263
unique_name=name,
203264
sort_by=sort_by,
204265
)

0 commit comments

Comments
 (0)