Skip to content

Commit f32f00d

Browse files
authored
Merge pull request #31 from GregoryKogan/feature/map-reduce-dev
feat: update dev vs prod documentation and enhance command-mode descr…
2 parents 49fd056 + 366053e commit f32f00d

8 files changed

Lines changed: 594 additions & 123 deletions

File tree

docs/dev-vs-prod.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,34 @@ Typical flow:
8888

8989
See [Map operations — Append output](operations/map.md) (`append: true` section).
9090

91+
Command-mode mappers only (string commands). TypedJob map legs run on the cluster in prod.
92+
93+
### MapReduce (dev)
94+
95+
Typical flow:
96+
97+
1. Sandbox: `.dev/sandbox_mr_<input>-><output>/` with `input.jsonl`, `intermediate.jsonl`, and `output.jsonl`.
98+
2. Copy input JSONL into the sandbox and upload file dependencies (same as map).
99+
3. Run the mapper command as a subprocess; stdout becomes `intermediate.jsonl`.
100+
4. Sort intermediate rows by `sort_by` when set, otherwise by `reduce_by` (loads the JSONL into memory; for small local fixtures only).
101+
5. Run the reducer command; stdout becomes the output table at `.dev/<output>.jsonl`.
102+
6. Stderr for each leg: `.dev/<output_basename>_mapper.log` and `_reducer.log`.
103+
104+
String commands only; TypedJob map-reduce legs are prod-only (same rule as map).
105+
106+
Dev runs one mapper and one reducer process (no shuffle partitions). For command-mode reducers that expect sorted keys, dev sorting matches what the cluster provides after shuffle.
107+
108+
### Reduce (dev)
109+
110+
Typical flow:
111+
112+
1. Sandbox: `.dev/sandbox_reduce_<input>-><output>/`.
113+
2. Copy input JSONL, upload dependencies, auto-sort rows by `reduce_by` (in-memory; small fixtures only).
114+
3. Run the reducer subprocess; stdout becomes `.dev/<output>.jsonl`.
115+
4. Stderr: `.dev/<output_basename>_reducer.log`.
116+
117+
String commands only. Dev auto-sorts before the reducer so you do not need a separate `run_sort` stage locally. In prod, the input table must already be sorted by `reduce_by` (or run sort first).
118+
91119
### Vanilla (dev)
92120

93121
1. Sandbox under `.dev/<stage>_sandbox/` (name depends on stage).

docs/operations/command-mode-map-reduce.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,6 @@ If omitted, the uploader picks defaults (e.g. first existing among `reducer.py`,
7272

7373
## Dev client
7474

75-
`client_dev` does not execute map-reduce legs; it logs whether each leg would be TypedJob or command (`JsonFormat` in prod) for sanity checks.
75+
In dev mode, map-reduce and reduce run locally as subprocesses (JSONL stdin/stdout) with the same contract as command-mode map. Map-reduce runs mapper → sort by `sort_by` or `reduce_by` → reducer. Reduce-only auto-sorts by `reduce_by` before the reducer.
76+
77+
String commands only; TypedJob legs stay prod-only. See [Dev vs prod — MapReduce and Reduce](../dev-vs-prod.md).

tests/test_client_dev.py

Lines changed: 125 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -570,63 +570,163 @@ def test_dev_client_run_yql_warns_when_input_table_jsonl_missing(
570570
)
571571

572572

573-
def test_dev_client_run_map_reduce_copies_input_jsonl_to_output(
574-
tmp_path: Path,
575-
) -> None:
573+
_DEV_MR_MAPPER = """#!/usr/bin/env python3
574+
import json
575+
import sys
576+
577+
for line in sys.stdin:
578+
row = json.loads(line)
579+
print(json.dumps({"k": row["k"], "v": int(row["v"])}), flush=True)
580+
"""
581+
582+
_DEV_MR_REDUCER = """#!/usr/bin/env python3
583+
import json
584+
import sys
585+
586+
cur_k = None
587+
s = 0
588+
589+
590+
def flush() -> None:
591+
global cur_k, s
592+
if cur_k is not None:
593+
print(json.dumps({"k": cur_k, "sum_v": s}), flush=True)
594+
595+
596+
for line in sys.stdin:
597+
row = json.loads(line)
598+
k, v = row["k"], int(row["v"])
599+
if cur_k is None:
600+
cur_k, s = k, v
601+
elif k == cur_k:
602+
s += v
603+
else:
604+
flush()
605+
cur_k, s = k, v
606+
flush()
607+
"""
608+
609+
610+
def _write_mr_job_scripts(pipeline_dir: Path) -> list[tuple[str, str]]:
611+
(pipeline_dir / "mapper.py").write_text(_DEV_MR_MAPPER, encoding="utf-8")
612+
(pipeline_dir / "reducer.py").write_text(_DEV_MR_REDUCER, encoding="utf-8")
613+
return [
614+
("//yt/mapper.py", "mapper.py"),
615+
("//yt/reducer.py", "reducer.py"),
616+
]
617+
618+
619+
def test_dev_client_run_map_reduce_sums_by_key(tmp_path: Path) -> None:
576620
client = YTDevClient(_null_logger("tests.client_dev.mr"), pipeline_dir=tmp_path)
577-
client.write_table("//tmp/mr_in", [{"row": 1}], append=False)
621+
client.write_table(
622+
"//tmp/mr_in",
623+
[
624+
{"k": "a", "v": 1},
625+
{"k": "a", "v": 2},
626+
{"k": "b", "v": 3},
627+
],
628+
append=False,
629+
)
630+
deps = _write_mr_job_scripts(tmp_path)
578631
op = client.run_map_reduce(
579632
"python3 mapper.py",
580633
"python3 reducer.py",
581634
"//tmp/mr_in",
582635
"//tmp/mr_out",
583-
["id"],
584-
[],
636+
["k"],
637+
deps,
585638
OperationResources(),
586639
{},
587640
)
588-
assert op.get_state() == "completed" and client.read_table("//tmp/mr_out") == [
589-
{"row": 1}
590-
], "dev run_map_reduce copies input table jsonl to output"
641+
assert op.get_state() == "completed", "map-reduce must finish successfully"
642+
got = {r["k"]: r["sum_v"] for r in client.read_table("//tmp/mr_out")}
643+
assert got == {"a": 3, "b": 3}, "dev map-reduce runs mapper then reducer"
591644

592645

593-
def test_dev_client_run_map_reduce_writes_empty_output_when_input_missing(
646+
def test_dev_client_run_map_reduce_raises_when_input_missing(tmp_path: Path) -> None:
647+
client = YTDevClient(
648+
_null_logger("tests.client_dev.mr_miss"), pipeline_dir=tmp_path
649+
)
650+
with pytest.raises(FileNotFoundError, match="input table file not found"):
651+
client.run_map_reduce(
652+
"python3 mapper.py",
653+
"python3 reducer.py",
654+
"//tmp/missing_in",
655+
"//tmp/mr_empty_out",
656+
["k"],
657+
[],
658+
OperationResources(),
659+
{},
660+
)
661+
662+
663+
def test_dev_client_run_map_reduce_fails_when_mapper_exits_nonzero(
594664
tmp_path: Path,
595665
) -> None:
596666
client = YTDevClient(
597-
_null_logger("tests.client_dev.mr_miss"), pipeline_dir=tmp_path
667+
_null_logger("tests.client_dev.mr_fail"), pipeline_dir=tmp_path
598668
)
669+
client.write_table("//tmp/mr_in", [{"k": "a", "v": 1}], append=False)
599670
op = client.run_map_reduce(
600-
"m",
601-
"r",
602-
"//tmp/missing_in",
603-
"//tmp/mr_empty_out",
604-
[],
605-
[],
671+
"exit 1",
672+
"python3 reducer.py",
673+
"//tmp/mr_in",
674+
"//tmp/mr_out",
675+
["k"],
676+
_write_mr_job_scripts(tmp_path),
606677
OperationResources(),
607678
{},
608679
)
609-
assert op.get_state() == "completed"
610-
assert client.read_table("//tmp/mr_empty_out") == []
680+
assert op.get_state() == "failed", "mapper failure should fail the operation"
681+
assert not client._table_local_path("//tmp/mr_out").exists(), (
682+
"reducer must not run when mapper fails"
683+
)
611684

612685

613-
def test_dev_client_run_reduce_copies_input_jsonl_to_output(
686+
def test_dev_client_run_reduce_sums_unsorted_input_after_auto_sort(
614687
tmp_path: Path,
615688
) -> None:
616689
client = YTDevClient(_null_logger("tests.client_dev.red"), pipeline_dir=tmp_path)
617-
client.write_table("//tmp/rd_in", [{"k": "v"}], append=False)
690+
client.write_table(
691+
"//tmp/rd_in",
692+
[
693+
{"k": "b", "v": 1},
694+
{"k": "a", "v": 2},
695+
{"k": "a", "v": 1},
696+
],
697+
append=False,
698+
)
699+
(tmp_path / "reducer.py").write_text(_DEV_MR_REDUCER, encoding="utf-8")
618700
op = client.run_reduce(
619701
"python3 reducer.py",
620702
"//tmp/rd_in",
621703
"//tmp/rd_out",
622704
["k"],
623-
[],
705+
[("//yt/reducer.py", "reducer.py")],
624706
OperationResources(),
625707
{},
626708
)
627-
assert op.get_state() == "completed" and client.read_table("//tmp/rd_out") == [
628-
{"k": "v"}
629-
], "dev run_reduce copies input table jsonl to output"
709+
assert op.get_state() == "completed", "reduce must finish successfully"
710+
got = {r["k"]: r["sum_v"] for r in client.read_table("//tmp/rd_out")}
711+
assert got == {"a": 3, "b": 1}, "dev reduce auto-sorts then runs reducer"
712+
713+
714+
def test_dev_client_run_map_reduce_raises_for_typed_job_legs(tmp_path: Path) -> None:
715+
client = YTDevClient(
716+
_null_logger("tests.client_dev.mr_typed"), pipeline_dir=tmp_path
717+
)
718+
client.write_table("//tmp/mr_in", [{"k": "a", "v": 1}], append=False)
719+
with pytest.raises(NotImplementedError, match="string commands"):
720+
client.run_map_reduce(
721+
TypedJob(),
722+
"python3 reducer.py",
723+
"//tmp/mr_in",
724+
"//tmp/mr_out",
725+
["k"],
726+
[],
727+
OperationResources(),
728+
{},
729+
)
630730

631731

632732
def test_dev_client_join_tables_materializes_joined_rows_in_output_jsonl(

tests/test_client_dev_runtime.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Unit tests for yt_framework.yt.support._client_dev_runtime helpers."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from typing import TYPE_CHECKING
7+
8+
from yt_framework.yt.support._client_dev_runtime import (
9+
dev_copy_output_to_table,
10+
dev_resolve_sort_keys,
11+
dev_sort_jsonl_file,
12+
)
13+
14+
if TYPE_CHECKING:
15+
from pathlib import Path
16+
17+
18+
def test_dev_resolve_sort_keys_prefers_sort_by_when_set() -> None:
19+
keys = dev_resolve_sort_keys(reduce_by=["a"], sort_by=["b", "a"])
20+
assert keys == ["b", "a"], "sort_by wins when non-empty"
21+
22+
23+
def test_dev_resolve_sort_keys_falls_back_to_reduce_by() -> None:
24+
keys = dev_resolve_sort_keys(reduce_by=["k"], sort_by=None)
25+
assert keys == ["k"], "reduce_by used when sort_by absent"
26+
27+
28+
def test_dev_sort_jsonl_file_missing_sort_key_sorts_before_present(
29+
tmp_path: Path,
30+
) -> None:
31+
path = tmp_path / "rows.jsonl"
32+
path.write_text(
33+
'{"k":"b"}\n{}\n{"k":"a"}\n',
34+
encoding="utf-8",
35+
)
36+
dev_sort_jsonl_file(path, ["k"])
37+
rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
38+
assert rows == [{}, {"k": "a"}, {"k": "b"}], (
39+
"rows missing sort key sort before rows that define the key"
40+
)
41+
42+
43+
def test_dev_sort_jsonl_file_mixed_types_do_not_raise(tmp_path: Path) -> None:
44+
path = tmp_path / "rows.jsonl"
45+
path.write_text(
46+
'{"k": 2}\n{"k": "1"}\n{"k": 10}\n',
47+
encoding="utf-8",
48+
)
49+
dev_sort_jsonl_file(path, ["k"])
50+
rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
51+
assert rows == [{"k": "1"}, {"k": 10}, {"k": 2}], (
52+
"mixed types sort by JSON canonical string without TypeError"
53+
)
54+
55+
56+
def test_dev_sort_jsonl_file_multi_key_missing_secondary_key(tmp_path: Path) -> None:
57+
path = tmp_path / "rows.jsonl"
58+
path.write_text(
59+
'{"k":"a","v":2}\n{"k":"a"}\n{"k":"a","v":1}\n',
60+
encoding="utf-8",
61+
)
62+
dev_sort_jsonl_file(path, ["k", "v"])
63+
rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
64+
assert rows == [{"k": "a"}, {"k": "a", "v": 1}, {"k": "a", "v": 2}], (
65+
"missing secondary sort key sorts before rows that define it"
66+
)
67+
68+
69+
def test_dev_sort_jsonl_file_orders_rows_by_keys(tmp_path: Path) -> None:
70+
path = tmp_path / "rows.jsonl"
71+
path.write_text(
72+
'{"k":"b","v":1}\n{"k":"a","v":2}\n{"k":"a","v":1}\n',
73+
encoding="utf-8",
74+
)
75+
dev_sort_jsonl_file(path, ["k"])
76+
rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
77+
assert [r["k"] for r in rows] == ["a", "a", "b"], "stable sort by reduce key"
78+
79+
80+
def test_dev_sort_jsonl_file_noop_when_sort_keys_empty(tmp_path: Path) -> None:
81+
path = tmp_path / "rows.jsonl"
82+
original = '{"k":"b"}\n{"k":"a"}\n'
83+
path.write_text(original, encoding="utf-8")
84+
dev_sort_jsonl_file(path, [])
85+
assert path.read_text(encoding="utf-8") == original
86+
87+
88+
def test_dev_copy_output_to_table_skips_on_nonzero_returncode(tmp_path: Path) -> None:
89+
sandbox_output = tmp_path / "sand.jsonl"
90+
sandbox_output.write_text('{"x": 1}\n', encoding="utf-8")
91+
out_table = tmp_path / "out.jsonl"
92+
dev_copy_output_to_table(
93+
proc_returncode=1,
94+
sandbox_output=sandbox_output,
95+
output_table_local_path=out_table,
96+
)
97+
assert not out_table.exists(), "failed leg must not publish output"
98+
99+
100+
def test_dev_sort_jsonl_file_uses_sort_by_columns(tmp_path: Path) -> None:
101+
path = tmp_path / "rows.jsonl"
102+
path.write_text(
103+
'{"k":"x","ord":2}\n{"k":"x","ord":1}\n',
104+
encoding="utf-8",
105+
)
106+
dev_sort_jsonl_file(path, ["ord"])
107+
rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
108+
assert [r["ord"] for r in rows] == [1, 2]

0 commit comments

Comments
 (0)