Skip to content

Commit 878eb8a

Browse files
authored
feat(amber): add loop-bookkeeping columns to materialized State (dormant) (#5900)
### What changes were proposed in this PR? Extends the cross-region **State materialization** format from a single `content` column to **3 columns** — `content`, `loop_counter`, `loop_start_id` — promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: the `OutputManager` state writer + `emit_state`, the Python network sender/receiver, the materialization reader, and the Scala `state.toTuple` call sites. In memory the two loop fields ride on the `StateFrame` envelope; they are materialized/serialized as their own columns (parallel to `content`), and `from_tuple` / `fromTuple` read only `content` back into the `State`. The loop-back write address (LoopStart's input-port URI) is **intentionally not** carried in State. It's constant per-execution config, not per-iteration data, so it will be delivered to Loop End at **setup** in the loop PR rather than round-tripping through every State row. (An earlier revision of this PR carried a `loop_start_state_uri` column; it was dropped after review — better than shipping a dormant column and removing it later.) On the Python side the column-name → value mapping is defined once in `State.to_columns` and reused by both `to_tuple` (iceberg) and the network sender's `StateFrame` branch, so adding a column later is a single-line change in one place rather than an edit in every serializer. **Dormant on `main`** — nothing observable changes without the loop operators: - `to_tuple()` / `toTuple()` and `OutputManager.save_state_to_storage_if_needed` / `emit_state` default the two loop columns to `0` / `""`, so every existing non-loop caller is unchanged. - `from_tuple` / `fromTuple` read only the `content` column, so round-trip identity is preserved and the extra columns are inert. No backward-compatible read of old 1-column State is needed: State materialization is **intra-execution only** — the iceberg state-document URI is execution-scoped (`…/eid/{executionId}/`) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 3-column reader. This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR). ### Any related issues, documentation, discussions? Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop"). ### How was this PR tested? - **Format / round-trip:** `test_state.py` (loop columns are their own columns, never in content JSON, default to `0` / `""`), Scala `StateSpec` (both loop columns round-trip through a tuple with non-default values, not just `content`), `ArrowUtilsSpec` (3-column Arrow vector round-trip), `IcebergDocumentSpec` (iceberg state-doc round-trip). - **Transport:** `test_network_receiver.py`, `test_input_port_materialization_reader_runnable.py`, and `test_state_materialization_e2e.py` — the e2e (hermetic sqlite catalog) writes non-default values for both loop columns end-to-end and asserts they replay both on the `StateFrame` and on the raw iceberg row, exercising the real Tuple/Schema/iceberg path. - **Dormancy:** `test_output_manager.py::test_defaults_loop_columns_when_omitted` pins that a no-loop caller (no `loop_counter`) still produces a valid 3-column tuple with the loop columns at `0` / `""`. - Local: `workflow-core` + `amber` compile; `StateSpec` + `ArrowUtilsSpec` pass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (`IcebergDocumentSpec` needs the iceberg catalog backend, so it runs in CI.) ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF.
1 parent 322babf commit 878eb8a

17 files changed

Lines changed: 510 additions & 317 deletions

File tree

amber/src/main/python/core/architecture/packaging/output_manager.py

Lines changed: 52 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -133,47 +133,31 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri_base: st
133133
state materialization on the same port. `storage_uri_base` is the
134134
port's base URI; the result and state URIs are derived from it.
135135
"""
136-
document, _ = DocumentFactory.open_document(
137-
VFSURIFactory.result_uri(storage_uri_base)
138-
)
139-
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
140-
writer_queue = Queue()
141-
port_storage_writer = PortStorageWriter(
142-
buffered_item_writer=buffered_item_writer, queue=writer_queue
143-
)
144-
writer_thread = threading.Thread(
145-
target=port_storage_writer.run,
146-
daemon=True,
147-
name=f"port_storage_writer_thread_{port_id}",
148-
)
149-
writer_thread.start()
150-
self._port_storage_writers[port_id] = (
151-
writer_queue,
152-
port_storage_writer,
153-
writer_thread,
154-
)
155136

156-
state_document, _ = DocumentFactory.open_document(
157-
VFSURIFactory.state_uri(storage_uri_base)
158-
)
159-
state_buffered_item_writer = state_document.writer(
160-
str(get_worker_index(self.worker_id))
161-
)
162-
state_writer_queue = Queue()
163-
state_port_writer = PortStorageWriter(
164-
buffered_item_writer=state_buffered_item_writer,
165-
queue=state_writer_queue,
166-
)
167-
state_writer_thread = threading.Thread(
168-
target=state_port_writer.run,
169-
daemon=True,
170-
name=f"port_state_writer_thread_{port_id}",
137+
def start_writer(uri: str, name_prefix: str, registry: dict) -> None:
138+
document, _ = DocumentFactory.open_document(uri)
139+
writer_queue = Queue()
140+
writer = PortStorageWriter(
141+
buffered_item_writer=document.writer(
142+
str(get_worker_index(self.worker_id))
143+
),
144+
queue=writer_queue,
145+
)
146+
thread = threading.Thread(
147+
target=writer.run, daemon=True, name=f"{name_prefix}_{port_id}"
148+
)
149+
thread.start()
150+
registry[port_id] = (writer_queue, writer, thread)
151+
152+
start_writer(
153+
VFSURIFactory.result_uri(storage_uri_base),
154+
"port_storage_writer_thread",
155+
self._port_storage_writers,
171156
)
172-
state_writer_thread.start()
173-
self._port_state_writers[port_id] = (
174-
state_writer_queue,
175-
state_port_writer,
176-
state_writer_thread,
157+
start_writer(
158+
VFSURIFactory.state_uri(storage_uri_base),
159+
"port_state_writer_thread",
160+
self._port_state_writers,
177161
)
178162

179163
def get_port(self, port_id=None) -> WorkerPort:
@@ -203,14 +187,22 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
203187
PortStorageWriterElement(data_tuple=tuple_)
204188
)
205189

206-
def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
190+
def save_state_to_storage_if_needed(
191+
self,
192+
state: State,
193+
loop_counter: int = 0,
194+
loop_start_id: str = "",
195+
port_id=None,
196+
) -> None:
207197
# When port_id is omitted the same state row is fanned out to
208198
# every output port's state table. This mirrors the
209199
# broadcast-to-all-workers behavior on the emit side: state is
210200
# shared context, not per-key data, so every downstream operator
211201
# (and every worker reading the materialization) needs the full
212202
# set.
213-
element = PortStorageWriterElement(data_tuple=state.to_tuple())
203+
element = PortStorageWriterElement(
204+
data_tuple=state.to_tuple(loop_counter, loop_start_id)
205+
)
214206
if port_id is None:
215207
for writer_queue, _, _ in self._port_state_writers.values():
216208
writer_queue.put(element)
@@ -223,18 +215,16 @@ def close_port_storage_writers(self) -> None:
223215
writer threads to finish, which indicates the port storage writing
224216
are finished.
225217
"""
226-
for _, writer, _ in self._port_storage_writers.values():
227-
# This non-blocking stop call will let the storage writers
228-
# flush the remaining buffer
229-
writer.stop()
230-
for _, _, writer_thread in self._port_storage_writers.values():
231-
# This blocking call will wait for all the writer to finish commit
232-
writer_thread.join()
233-
for _, state_writer, _ in self._port_state_writers.values():
234-
state_writer.stop()
235-
for _, _, state_writer_thread in self._port_state_writers.values():
236-
state_writer_thread.join()
237-
self._port_state_writers.clear()
218+
for registry in (self._port_storage_writers, self._port_state_writers):
219+
# Non-blocking stop lets each writer flush its remaining buffer;
220+
# the join then waits for the commit to finish.
221+
for _, writer, _ in registry.values():
222+
writer.stop()
223+
for _, _, thread in registry.values():
224+
thread.join()
225+
# Drop the stopped writers so a later close doesn't act on
226+
# stale entries.
227+
registry.clear()
238228

239229
def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None:
240230
"""
@@ -290,15 +280,22 @@ def emit_ecm(
290280
)
291281

292282
def emit_state(
293-
self, state: State
283+
self,
284+
state: State,
285+
loop_counter: int = 0,
286+
loop_start_id: str = "",
294287
) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]:
295288
return chain(
296289
*(
297290
(
298291
(
299292
receiver,
300293
(
301-
StateFrame(payload)
294+
StateFrame(
295+
payload,
296+
loop_counter=loop_counter,
297+
loop_start_id=loop_start_id,
298+
)
302299
if isinstance(payload, State)
303300
else self.tuple_to_frame(payload)
304301
),

amber/src/main/python/core/models/payload.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,11 @@ class DataFrame(DataPayload):
3434
@dataclass
3535
class StateFrame(DataPayload):
3636
frame: State
37+
# Loop-control bookkeeping owned by the worker runtime, carried alongside
38+
# the State payload (not inside it) so it never collides with user state.
39+
# Defaults are the "no loop" values for all non-loop state.
40+
loop_counter: int = 0
41+
# Which LoopStart to jump back to. Set by the runtime on a LoopStart's
42+
# output, consumed by the matching LoopEnd. Empty for non-loop /
43+
# not-yet-stamped state.
44+
loop_start_id: str = ""

amber/src/main/python/core/models/state.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,50 @@
2525

2626
class State(dict):
2727
CONTENT = "content"
28-
SCHEMA = Schema(raw_schema={CONTENT: "STRING"})
28+
# Loop-control bookkeeping owned by the worker runtime, NOT user state -- it
29+
# never appears in the content JSON. In memory it rides on the StateFrame
30+
# envelope; it is materialized/serialized as its own column (parallel to
31+
# content) by to_tuple(...). from_tuple() returns the bare State; callers
32+
# that need these values read the corresponding columns off the tuple.
33+
LOOP_COUNTER = "loop_counter"
34+
LOOP_START_ID = "loop_start_id"
35+
SCHEMA = Schema(
36+
raw_schema={
37+
CONTENT: "STRING",
38+
LOOP_COUNTER: "LONG",
39+
LOOP_START_ID: "STRING",
40+
}
41+
)
2942

3043
def to_json(self) -> str:
3144
return json.dumps(_to_json_value(self), separators=(",", ":"))
3245

33-
def to_tuple(self) -> Tuple:
34-
return Tuple({State.CONTENT: self.to_json()}, schema=State.SCHEMA)
46+
@staticmethod
47+
def to_columns(
48+
content_json: str,
49+
loop_counter: int = 0,
50+
loop_start_id: str = "",
51+
) -> dict:
52+
"""The single column-name -> value mapping for the State wire/storage
53+
format. Both ``to_tuple`` (iceberg materialization) and the network
54+
sender build from this, so adding a column is a one-line change here
55+
rather than in every serializer.
56+
"""
57+
return {
58+
State.CONTENT: content_json,
59+
State.LOOP_COUNTER: int(loop_counter),
60+
State.LOOP_START_ID: loop_start_id,
61+
}
62+
63+
def to_tuple(
64+
self,
65+
loop_counter: int = 0,
66+
loop_start_id: str = "",
67+
) -> Tuple:
68+
return Tuple(
69+
State.to_columns(self.to_json(), loop_counter, loop_start_id),
70+
schema=State.SCHEMA,
71+
)
3572

3673
@classmethod
3774
def from_json(cls, payload: str) -> "State":

amber/src/main/python/core/runnables/network_receiver.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ def data_handler(command: bytes, table: Table) -> int:
9696
"Data",
9797
lambda _: DataFrame(table),
9898
"State",
99-
lambda _: StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
99+
lambda _: StateFrame(
100+
State.from_json(table[State.CONTENT][0].as_py()),
101+
loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
102+
loop_start_id=table[State.LOOP_START_ID][0].as_py(),
103+
),
100104
"ECM",
101105
lambda _: EmbeddedControlMessage().parse(table["payload"][0].as_py()),
102106
)

amber/src/main/python/core/runnables/network_sender.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020
from overrides import overrides
2121
from typing import Optional
2222

23-
from core.models import DataPayload, InternalQueue, DataFrame, State, StateFrame
23+
from core.models import (
24+
DataPayload,
25+
InternalQueue,
26+
DataFrame,
27+
State,
28+
StateFrame,
29+
)
2430
from core.models.internal_queue import (
2531
InternalQueueElement,
2632
DataElement,
@@ -99,8 +105,13 @@ def _send_data(self, to: ChannelIdentity, data_payload: DataPayload) -> None:
99105
self._proxy_client.send_data(bytes(data_header), data_payload.frame)
100106
elif isinstance(data_payload, StateFrame):
101107
data_header = PythonDataHeader(tag=to, payload_type="State")
108+
columns = State.to_columns(
109+
data_payload.frame.to_json(),
110+
data_payload.loop_counter,
111+
data_payload.loop_start_id,
112+
)
102113
table = pa.Table.from_pydict(
103-
{State.CONTENT: [data_payload.frame.to_json()]},
114+
{name: [value] for name, value in columns.items()},
104115
schema=State.SCHEMA.as_arrow_schema(),
105116
)
106117
self._proxy_client.send_data(bytes(data_header), table)

amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@
3434
from core.architecture.sendsemantics.round_robin_partitioner import (
3535
RoundRobinPartitioner,
3636
)
37-
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame
37+
from core.models import (
38+
Tuple,
39+
InternalQueue,
40+
DataFrame,
41+
DataPayload,
42+
State,
43+
StateFrame,
44+
)
3845
from core.models.internal_queue import DataElement, ECMElement
3946
from core.storage.document_factory import DocumentFactory
4047
from core.storage.vfs_uri_factory import VFSURIFactory
@@ -152,7 +159,13 @@ def run(self) -> None:
152159
VFSURIFactory.state_uri(self.uri)
153160
)
154161
for state_row in state_document.get():
155-
self.emit_payload(StateFrame(State.from_tuple(state_row)))
162+
self.emit_payload(
163+
StateFrame(
164+
State.from_tuple(state_row),
165+
loop_counter=state_row[State.LOOP_COUNTER],
166+
loop_start_id=state_row[State.LOOP_START_ID],
167+
)
168+
)
156169

157170
storage_iterator = self.materialization.get()
158171
# Iterate and process tuples.

amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class OutputManager(
242242
// emit side: state is shared context, not per-key data, so every
243243
// downstream operator (and every worker reading the materialization)
244244
// needs the full set.
245-
stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
245+
stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple())))
246246
}
247247

248248
/**

amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], val actorId: ActorVirtu
125125
case DataFrame(frame) =>
126126
writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*), from, "Data")
127127
case StateFrame(state) =>
128-
writeArrowStream(mutable.Queue(state.toTuple), from, "State")
128+
writeArrowStream(mutable.Queue(state.toTuple()), from, "State")
129129
}
130130
}
131131

0 commit comments

Comments
 (0)