Skip to content

Commit 4f56946

Browse files
authored
test(workflow-core): add unit test coverage for port-identity serde helpers (#4954)
### What changes were proposed in this PR? Adds `PortIdentitySerdeSpec` covering three Jackson / VFS-URI serde helpers in `org.apache.texera.amber.util.serde`, and tightens the production contract on `GlobalPortIdentitySerde` after review. Production change (added per review): | Surface | Change | | --- | --- | | `GlobalPortIdentitySerde.serializeAsString` | Rejects `portId < 0`, `logicalOpId` containing `_`, `layerName` containing `_` with `IllegalArgumentException`. | | `GlobalPortIdentitySerde.deserializeFromString` | Symmetrically rejects `portId < 0`. | | Python `serialize_global_port_identity` / `deserialize_global_port_identity` | Mirrored: raises `ValueError` on the same invariants. | | Iceberg test fixtures (`IcebergTableStatsSpec`, `IcebergDocumentSpec`, `test_iceberg_document.py`) | Op-id `test_table_<uuid>` → `test-table-<uuid>` to honor the new contract. `IcebergDocumentConsoleMessagesSpec` is unchanged — `createConsoleMessagesURI` does not go through `serializeAsString`. | Test coverage: | Helper | Pinned by this spec | | --- | --- | | `GlobalPortIdentitySerde.serializeAsString` / `deserializeFromString` | Default + per-field round-trip; exact format pin (default + non-default values); special-character pass-through (dashes / dots); negative `portId` rejected (serialize + deserialize); underscored `logicalOpId` rejected; underscored `layerName` rejected; format-character no-underscore invariant; seven negative paths — completely malformed, missing field, wrong field order, trailing content, empty body, non-numeric `portId` → `NumberFormatException`, non-boolean flag → `IllegalArgumentException` | | `PortIdentityKeySerializer.portIdToString` | Exact `id_internal` format. | | `PortIdentityKeySerializer` + `PortIdentityKeyDeserializer` (Jackson wiring) | The production `JSONUtils.objectMapper`, round-tripping `Map[PortIdentity, String]` and an empty map. | | `PortIdentityKeyDeserializer.deserializeKey` | Four negative paths (non-integer id, non-boolean flag, missing-separator with non-numeric body, missing-separator with numeric-only body); current lenient extra-segments behavior pinned in a characterization test, with a `pendingUntilFixed` for the stricter contract. | ### Any related issues, documentation, discussions? Closes #4953 ### How was this PR tested? ``` sbt "WorkflowCore/Test/testOnly org.apache.texera.amber.util.serde.PortIdentitySerdeSpec" # 24 tests, all pass (1 unchanged pendingUntilFixed for PortIdentityKeyDeserializer) sbt "WorkflowCore/Test/testOnly org.apache.texera.amber.core.storage.VFSURIFactorySpec org.apache.texera.amber.storage.result.iceberg.IcebergDocumentSpec org.apache.texera.amber.storage.result.iceberg.IcebergTableStatsSpec org.apache.texera.amber.storage.result.iceberg.IcebergDocumentConsoleMessagesSpec" # 30 tests, all pass sbt "WorkflowExecutionService/Test/testOnly org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResourceSpec" # 2 tests, all pass pytest core/util/virtual_identity/ # 17 tests, all pass sbt "WorkflowCore/scalafmtCheck; WorkflowCore/Test/scalafmtCheck" # clean ``` ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.7)
1 parent 4862b92 commit 4f56946

7 files changed

Lines changed: 379 additions & 6 deletions

File tree

amber/src/main/python/core/util/virtual_identity.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity) -> str:
4242
Expected format:
4343
``(logicalOpId=<logicalOpId>,layerName=<layerName>,
4444
portId=<portId.id>,isInternal=<portId.internal>,isInput=<input>)``
45+
46+
Raises ValueError if `logicalOpId` or `layerName` contains an underscore
47+
(VFS URI parsing relies on the absence of '_'), or if `portId` is negative.
4548
"""
4649
logical_op_id = obj.op_id.logical_op_id.id
4750
layer_name = obj.op_id.layer_name
4851
port_id = obj.port_id.id
4952
is_internal = obj.port_id.internal
5053
is_input_port = obj.input
54+
if "_" in logical_op_id:
55+
raise ValueError(
56+
f"logicalOpId must not contain '_' "
57+
f"(VFS URI parsing relies on this): {logical_op_id}"
58+
)
59+
if "_" in layer_name:
60+
raise ValueError(
61+
f"layerName must not contain '_' "
62+
f"(VFS URI parsing relies on this): {layer_name}"
63+
)
64+
if port_id < 0:
65+
raise ValueError(f"portId must be non-negative: {port_id}")
5166
return (
5267
f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id},"
5368
f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})"
@@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) -> GlobalPortIdentity:
7287
match.groups()
7388
)
7489
port_id = int(port_id_str)
90+
if port_id < 0:
91+
raise ValueError(f"portId must be non-negative: {port_id}")
7592
is_internal = is_internal_str.lower() == "true"
7693
is_input_port = is_input_str.lower() == "true"
7794
op_id = PhysicalOpIdentity(

amber/src/test/python/core/storage/iceberg/test_iceberg_document.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def iceberg_document(self, amber_schema):
8686
ExecutionIdentity(id=0),
8787
GlobalPortIdentity(
8888
op_id=PhysicalOpIdentity(
89-
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
89+
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
9090
layer_name="main",
9191
),
9292
port_id=PortIdentity(id=0),

amber/src/test/python/core/util/test_virtual_identity.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,21 @@ def test_round_trips_through_deserialize(self):
105105
assert recovered.port_id.internal is True
106106
assert recovered.input is False
107107

108+
def test_rejects_underscore_in_logical_op_id(self):
109+
# VFS-compatibility contract: serialized output must be
110+
# underscore-free. Fail fast at the boundary on underscored input.
111+
with pytest.raises(ValueError, match="logicalOpId must not contain"):
112+
serialize_global_port_identity(_gpi(op_id="__DummyOperator"))
113+
114+
def test_rejects_underscore_in_layer_name(self):
115+
with pytest.raises(ValueError, match="layerName must not contain"):
116+
serialize_global_port_identity(_gpi(layer="main_source_0_op"))
117+
118+
def test_rejects_negative_port_id(self):
119+
# Port ids are array indices and must be non-negative.
120+
with pytest.raises(ValueError, match="portId must be non-negative"):
121+
serialize_global_port_identity(_gpi(port=-1))
122+
108123

109124
class TestDeserializeGlobalPortIdentity:
110125
def test_parses_canonical_encoded_string(self):
@@ -137,6 +152,14 @@ def test_raises_value_error_on_missing_field(self):
137152
"(logicalOpId=op,layerName=l,portId=0,isInternal=true)"
138153
)
139154

155+
def test_raises_value_error_on_negative_port_id(self):
156+
# Symmetric with the serializer: tampered URIs with a negative
157+
# portId must be rejected on the way back in.
158+
with pytest.raises(ValueError, match="portId must be non-negative"):
159+
deserialize_global_port_identity(
160+
"(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)"
161+
)
162+
140163

141164
class TestGetFromActorIdForInputPortStorage:
142165
def test_prefixes_materialization_reader_to_uri_plus_actor_name(self):

common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,20 @@ import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
2323

2424
/**
2525
* Serialize and deserializes a GlobalPortIdentity object to a string using a custom, human-readable format
26-
* to ensure it works with both URI and file path and does not incldue underscore "_" so that it does not
27-
* interfere with our own VFS URI parsing.
26+
* to ensure it works with both URI and file path and does not include underscore "_" so that it does not
27+
* interfere with our own VFS URI parsing. Underscores in `logicalOpId` / `layerName` and negative `portId`
28+
* values are rejected with `IllegalArgumentException`.
2829
*/
2930
object GlobalPortIdentitySerde {
3031
implicit class SerdeOps(globalPortId: GlobalPortIdentity) {
3132

3233
/**
3334
* Serializes a GlobalPortIdentity object into a string using our custom, human-readable format
34-
* that works with both URI and file path and does not incldue underscore "_" so that it does not
35+
* that works with both URI and file path and does not include underscore "_" so that it does not
3536
* interfere with our own VFS URI parsing.
3637
*
38+
* @throws java.lang.IllegalArgumentException if `logicalOpId` or `layerName` contains an underscore,
39+
* or if `portId.id` is negative.
3740
* @return A serialized string representation of globalPortId
3841
*/
3942
def serializeAsString: String = {
@@ -42,6 +45,15 @@ object GlobalPortIdentitySerde {
4245
val portId = globalPortId.portId.id
4346
val isInternal = globalPortId.portId.internal
4447
val isInput = globalPortId.input
48+
require(
49+
!logicalOpId.contains('_'),
50+
s"logicalOpId must not contain '_' (VFS URI parsing relies on this): $logicalOpId"
51+
)
52+
require(
53+
!layerName.contains('_'),
54+
s"layerName must not contain '_' (VFS URI parsing relies on this): $layerName"
55+
)
56+
require(portId >= 0, s"portId must be non-negative: $portId")
4557
s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)"
4658
}
4759
}
@@ -58,6 +70,7 @@ object GlobalPortIdentitySerde {
5870
serializedGlobalPortId match {
5971
case pattern(logicalOpId, layerName, portIdStr, isInternalStr, isInputStr) =>
6072
val portIdInt = portIdStr.toInt
73+
require(portIdInt >= 0, s"portId must be non-negative: $portIdInt")
6174
val isInternal = isInternalStr.toBoolean
6275
val isInput = isInputStr.toBoolean
6376
val physicalOpId = PhysicalOpIdentity(

common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter
8585
GlobalPortIdentity(
8686
PhysicalOpIdentity(
8787
logicalOpId =
88-
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
88+
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
8989
layerName = "main"
9090
),
9191
PortIdentity()

common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with BeforeAndAfterAll with Suit
5656
GlobalPortIdentity(
5757
PhysicalOpIdentity(
5858
logicalOpId =
59-
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
59+
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
6060
layerName = "main"
6161
),
6262
PortIdentity()

0 commit comments

Comments
 (0)