Skip to content

Commit 1c828e5

Browse files
authored
Document and test multiprocessing pattern for concurrent XML loading (#58)
Adds a test_multiprocessing.py test (parallel parse + serialised DuckDB writes via multiprocessing.Lock, with XML roundtrip content assertion) and a matching example in the API overview docs. Each worker creates its own DataModel with a unique temp_prefix so temp tables never collide. Bumps version to 0.13.1.
1 parent 43ef530 commit 1c828e5

3 files changed

Lines changed: 183 additions & 1 deletion

File tree

docs/api/overview.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,74 @@ flowchart TB
5353
end
5454
```
5555

56+
### Multiprocessing example
57+
58+
XML parsing is CPU-bound and scales well across processes. Loading into the
59+
database, however, must be coordinated to avoid conflicts on shared tables.
60+
The right level of synchronisation depends on the backend:
61+
62+
* **DuckDB (file-based)** — only one active writer is allowed at a time, so
63+
all database I/O must be serialised.
64+
* **PostgreSQL, MS SQL Server, …** — concurrent writes to *different* temp
65+
tables are safe (each process gets a unique temp-table prefix), but the final
66+
merge into the shared target tables should be serialised.
67+
68+
The simplest approach — and the one shown below — is to serialise the entire
69+
database phase with a `multiprocessing.Lock`, keeping only the parsing step
70+
parallel. This works correctly for all backends.
71+
72+
```python
73+
import multiprocessing
74+
from xml2db import DataModel
75+
76+
77+
def load_one_file(xml_path, xsd_path, connection_string, lock):
78+
# Each process creates its own DataModel with a unique temp_prefix.
79+
model = DataModel(
80+
xsd_file=xsd_path,
81+
connection_string=connection_string,
82+
)
83+
# XML parsing is CPU-bound and runs in parallel across all processes.
84+
doc = model.parse_xml(xml_path)
85+
86+
# Serialise all database I/O across processes.
87+
with lock:
88+
doc.insert_into_target_tables()
89+
model.engine.dispose()
90+
91+
92+
if __name__ == "__main__":
93+
xsd_path = "schema.xsd"
94+
connection_string = "duckdb:///data.duckdb"
95+
xml_files = ["file1.xml", "file2.xml", "file3.xml"]
96+
97+
lock = multiprocessing.Lock()
98+
processes = [
99+
multiprocessing.Process(
100+
target=load_one_file,
101+
args=(xml_path, xsd_path, connection_string, lock),
102+
)
103+
for xml_path in xml_files
104+
]
105+
for p in processes:
106+
p.start()
107+
for p in processes:
108+
p.join()
109+
if p.exitcode != 0:
110+
raise RuntimeError(f"Worker failed with exit code {p.exitcode}")
111+
```
112+
113+
!!! Note
114+
For backends that support concurrent writers, you can increase throughput
115+
by splitting
116+
[`Document.insert_into_target_tables`](document.md/#xml2db.document.Document.insert_into_target_tables)
117+
into separate calls to
118+
[`Document.insert_into_temp_tables`](document.md/#xml2db.document.Document.insert_into_temp_tables)
119+
(run concurrently — each process has a unique temp-table prefix so there
120+
are no collisions) and
121+
[`Document.merge_into_target_tables`](document.md/#xml2db.document.Document.merge_into_target_tables)
122+
(serialised via lock).
123+
56124
## *Advanced use:* get data from the database back to XML
57125

58126
The flow chart below presents data conversions used to get back data from the database into XML, showing the functions

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "xml2db"
7-
version = "0.13.0"
7+
version = "0.13.1"
88
authors = [
99
{ name="Commission de régulation de l'énergie", email="opensource@cre.fr" },
1010
]

tests/test_multiprocessing.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""Tests for concurrent XML loading with multiprocessing and a file-based DuckDB."""
2+
import multiprocessing
3+
import os
4+
import tempfile
5+
6+
import pytest
7+
from lxml import etree
8+
9+
pytest.importorskip("duckdb", reason="duckdb not installed")
10+
11+
from sqlalchemy import String, create_engine, text
12+
13+
from xml2db import DataModel
14+
15+
_SAMPLE = os.path.join(os.path.dirname(__file__), "sample_models", "orders")
16+
_XSD = os.path.join(_SAMPLE, "orders.xsd")
17+
_XML_FILES = [
18+
os.path.join(_SAMPLE, "xml", f"order{i}.xml") for i in (1, 2, 3)
19+
]
20+
21+
# Matches orders model version 0 in sample_models/models.py so that the XML
22+
# roundtrip produces byte-for-byte identical output.
23+
_MODEL_CONFIG = {
24+
"tables": {
25+
"shiporder": {"fields": {"orderperson": {"transform": False}}},
26+
"item": None,
27+
},
28+
"record_hash_column_name": "record_hash",
29+
"metadata_columns": [
30+
{"name": "input_file_path", "type": String(256)},
31+
],
32+
}
33+
34+
35+
def _load_xml_file(xml_path: str, xsd_path: str, db_path: str, lock) -> None:
36+
"""Worker function: parse one XML file and load it into a shared DuckDB file.
37+
38+
Each process builds its own DataModel (and gets a unique temp_prefix UUID),
39+
so temporary tables never collide. All database I/O is serialised via *lock*
40+
because DuckDB allows only one active writer at a time.
41+
"""
42+
model = DataModel(
43+
xsd_file=xsd_path,
44+
connection_string=f"duckdb:///{db_path}",
45+
model_config=_MODEL_CONFIG,
46+
)
47+
# CPU-bound XML parsing runs in parallel across processes.
48+
doc = model.parse_xml(xml_path, metadata={"input_file_path": xml_path})
49+
50+
# Serialise all database access: one writer at a time for DuckDB.
51+
with lock:
52+
doc.insert_into_target_tables()
53+
# Dispose inside the lock so the file handle is released before
54+
# the next process tries to open the database.
55+
model.engine.dispose()
56+
57+
58+
def test_multiprocessing_file_duckdb():
59+
"""Three worker processes load XML files concurrently into a file-based DuckDB.
60+
61+
Parsing happens in parallel; database writes are serialised via a
62+
multiprocessing.Lock. After all workers finish:
63+
- the target table must contain one row per XML file, and
64+
- each file must round-trip back to identical XML (content assertion).
65+
"""
66+
with tempfile.TemporaryDirectory() as tmpdir:
67+
db_path = os.path.join(tmpdir, "test.duckdb")
68+
lock = multiprocessing.Lock()
69+
70+
processes = [
71+
multiprocessing.Process(
72+
target=_load_xml_file,
73+
args=(xml_path, _XSD, db_path, lock),
74+
)
75+
for xml_path in _XML_FILES
76+
]
77+
for p in processes:
78+
p.start()
79+
for p in processes:
80+
p.join()
81+
assert p.exitcode == 0, (
82+
f"Worker for {_XML_FILES[processes.index(p)]} "
83+
f"exited with code {p.exitcode}"
84+
)
85+
86+
# --- row count ---
87+
engine = create_engine(f"duckdb:///{db_path}")
88+
with engine.connect() as conn:
89+
count = conn.execute(text("SELECT COUNT(*) FROM orders")).scalar()
90+
engine.dispose()
91+
assert count == len(_XML_FILES)
92+
93+
# --- content roundtrip ---
94+
verify_model = DataModel(
95+
xsd_file=_XSD,
96+
connection_string=f"duckdb:///{db_path}",
97+
model_config=_MODEL_CONFIG,
98+
)
99+
for xml_path in _XML_FILES:
100+
doc = verify_model.extract_from_database(
101+
f"input_file_path='{xml_path}'",
102+
force_tz="Europe/Paris",
103+
)
104+
src = etree.parse(xml_path).getroot()
105+
el = doc.to_xml(nsmap=src.nsmap)
106+
for key, val in src.attrib.items():
107+
el.set(key, val)
108+
actual = etree.tostring(
109+
el, pretty_print=True, encoding="utf-8", xml_declaration=True
110+
).decode("utf-8")
111+
with open(xml_path) as f:
112+
expected = f.read()
113+
assert actual == expected, f"XML roundtrip failed for {xml_path}"
114+
verify_model.engine.dispose()

0 commit comments

Comments
 (0)