|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | + |
| 20 | +package org.apache.texera.workflow |
| 21 | + |
| 22 | +import com.fasterxml.jackson.databind.JsonNode |
| 23 | +import com.fasterxml.jackson.databind.exc.{MismatchedInputException, ValueInstantiationException} |
| 24 | +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity |
| 25 | +import org.apache.texera.amber.core.workflow.PortIdentity |
| 26 | +import org.apache.texera.amber.util.JSONUtils.objectMapper |
| 27 | +import org.scalatest.flatspec.AnyFlatSpec |
| 28 | + |
| 29 | +class LogicalLinkSpec extends AnyFlatSpec { |
| 30 | + |
| 31 | + // --------------------------------------------------------------------------- |
| 32 | + // Primary constructor + case-class semantics |
| 33 | + // --------------------------------------------------------------------------- |
| 34 | + |
| 35 | + "LogicalLink primary constructor" should "expose the four fields it was constructed with" in { |
| 36 | + val link = LogicalLink( |
| 37 | + fromOpId = OperatorIdentity("op-A"), |
| 38 | + fromPortId = PortIdentity(0), |
| 39 | + toOpId = OperatorIdentity("op-B"), |
| 40 | + toPortId = PortIdentity(1, internal = true) |
| 41 | + ) |
| 42 | + assert(link.fromOpId == OperatorIdentity("op-A")) |
| 43 | + assert(link.fromPortId == PortIdentity(0)) |
| 44 | + assert(link.toOpId == OperatorIdentity("op-B")) |
| 45 | + assert(link.toPortId == PortIdentity(1, internal = true)) |
| 46 | + } |
| 47 | + |
| 48 | + "LogicalLink case-class equality" should "use structural equality across all four fields" in { |
| 49 | + val a = |
| 50 | + LogicalLink(OperatorIdentity("x"), PortIdentity(0), OperatorIdentity("y"), PortIdentity(1)) |
| 51 | + val b = |
| 52 | + LogicalLink(OperatorIdentity("x"), PortIdentity(0), OperatorIdentity("y"), PortIdentity(1)) |
| 53 | + assert(a == b) |
| 54 | + assert(a.hashCode == b.hashCode) |
| 55 | + } |
| 56 | + |
| 57 | + it should "distinguish links that differ only in fromOpId" in { |
| 58 | + val a = |
| 59 | + LogicalLink(OperatorIdentity("x"), PortIdentity(0), OperatorIdentity("y"), PortIdentity(1)) |
| 60 | + val b = |
| 61 | + LogicalLink(OperatorIdentity("z"), PortIdentity(0), OperatorIdentity("y"), PortIdentity(1)) |
| 62 | + assert(a != b) |
| 63 | + } |
| 64 | + |
| 65 | + it should "distinguish links that differ only in toPortId.internal" in { |
| 66 | + val a = LogicalLink( |
| 67 | + OperatorIdentity("x"), |
| 68 | + PortIdentity(0), |
| 69 | + OperatorIdentity("y"), |
| 70 | + PortIdentity(1, internal = false) |
| 71 | + ) |
| 72 | + val b = LogicalLink( |
| 73 | + OperatorIdentity("x"), |
| 74 | + PortIdentity(0), |
| 75 | + OperatorIdentity("y"), |
| 76 | + PortIdentity(1, internal = true) |
| 77 | + ) |
| 78 | + assert(a != b) |
| 79 | + } |
| 80 | + |
| 81 | + it should "reject a self-loop link (fromOpId == toOpId) regardless of port" in { |
| 82 | + // The constructor rejects fromOpId == toOpId — a workflow edge whose |
| 83 | + // source and sink are the same operator can never be schedulable, so |
| 84 | + // we fail fast here rather than letting it travel through the planner. |
| 85 | + val ex = intercept[IllegalArgumentException] { |
| 86 | + LogicalLink( |
| 87 | + OperatorIdentity("op-A"), |
| 88 | + PortIdentity(0), |
| 89 | + OperatorIdentity("op-A"), |
| 90 | + PortIdentity(1) |
| 91 | + ) |
| 92 | + } |
| 93 | + assert(ex.getMessage.contains("self-loop")) |
| 94 | + } |
| 95 | + |
| 96 | + it should "reject a null fromOpId / toOpId in the primary constructor" in { |
| 97 | + intercept[IllegalArgumentException] { |
| 98 | + LogicalLink(null, PortIdentity(0), OperatorIdentity("op-B"), PortIdentity(1)) |
| 99 | + } |
| 100 | + intercept[IllegalArgumentException] { |
| 101 | + LogicalLink(OperatorIdentity("op-A"), PortIdentity(0), null, PortIdentity(1)) |
| 102 | + } |
| 103 | + } |
| 104 | + |
| 105 | + it should "reject an OperatorIdentity wrapping a null id in the primary constructor" in { |
| 106 | + intercept[IllegalArgumentException] { |
| 107 | + LogicalLink( |
| 108 | + OperatorIdentity(null), |
| 109 | + PortIdentity(0), |
| 110 | + OperatorIdentity("op-B"), |
| 111 | + PortIdentity(1) |
| 112 | + ) |
| 113 | + } |
| 114 | + intercept[IllegalArgumentException] { |
| 115 | + LogicalLink( |
| 116 | + OperatorIdentity("op-A"), |
| 117 | + PortIdentity(0), |
| 118 | + OperatorIdentity(null), |
| 119 | + PortIdentity(1) |
| 120 | + ) |
| 121 | + } |
| 122 | + } |
| 123 | + |
| 124 | + it should "reject an OperatorIdentity wrapping an empty id in the primary constructor" in { |
| 125 | + intercept[IllegalArgumentException] { |
| 126 | + LogicalLink(OperatorIdentity(""), PortIdentity(0), OperatorIdentity("op-B"), PortIdentity(1)) |
| 127 | + } |
| 128 | + intercept[IllegalArgumentException] { |
| 129 | + LogicalLink(OperatorIdentity("op-A"), PortIdentity(0), OperatorIdentity(""), PortIdentity(1)) |
| 130 | + } |
| 131 | + } |
| 132 | + |
| 133 | + // --------------------------------------------------------------------------- |
| 134 | + // Secondary @JsonCreator constructor (string opId variant) |
| 135 | + // --------------------------------------------------------------------------- |
| 136 | + |
| 137 | + "LogicalLink secondary @JsonCreator constructor" should "wrap raw String op ids in OperatorIdentity" in { |
| 138 | + val link = new LogicalLink( |
| 139 | + fromOpId = "op-A", |
| 140 | + fromPortId = PortIdentity(0), |
| 141 | + toOpId = "op-B", |
| 142 | + toPortId = PortIdentity(1) |
| 143 | + ) |
| 144 | + assert(link.fromOpId == OperatorIdentity("op-A")) |
| 145 | + assert(link.toOpId == OperatorIdentity("op-B")) |
| 146 | + // Equal to a link built via the primary constructor. |
| 147 | + assert( |
| 148 | + link == LogicalLink( |
| 149 | + OperatorIdentity("op-A"), |
| 150 | + PortIdentity(0), |
| 151 | + OperatorIdentity("op-B"), |
| 152 | + PortIdentity(1) |
| 153 | + ) |
| 154 | + ) |
| 155 | + } |
| 156 | + |
| 157 | + it should "accept identifiers containing dashes / dots / digits (no normalization)" in { |
| 158 | + val link = new LogicalLink("my.op-1", PortIdentity(0), "my.op-2", PortIdentity(1)) |
| 159 | + assert(link.fromOpId == OperatorIdentity("my.op-1")) |
| 160 | + assert(link.toOpId == OperatorIdentity("my.op-2")) |
| 161 | + } |
| 162 | + |
| 163 | + it should "reject the empty string as an op id via the @JsonCreator constructor" in { |
| 164 | + intercept[IllegalArgumentException] { |
| 165 | + new LogicalLink("", PortIdentity(0), "op-B", PortIdentity(1)) |
| 166 | + } |
| 167 | + intercept[IllegalArgumentException] { |
| 168 | + new LogicalLink("op-A", PortIdentity(0), "", PortIdentity(1)) |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | + it should "reject a null string op id via the @JsonCreator constructor" in { |
| 173 | + intercept[IllegalArgumentException] { |
| 174 | + new LogicalLink(null: String, PortIdentity(0), "op-B", PortIdentity(1)) |
| 175 | + } |
| 176 | + intercept[IllegalArgumentException] { |
| 177 | + new LogicalLink("op-A", PortIdentity(0), null: String, PortIdentity(1)) |
| 178 | + } |
| 179 | + } |
| 180 | + |
| 181 | + it should "reject a self-loop via the @JsonCreator constructor (same string op id)" in { |
| 182 | + val ex = intercept[IllegalArgumentException] { |
| 183 | + new LogicalLink("op-A", PortIdentity(0), "op-A", PortIdentity(1)) |
| 184 | + } |
| 185 | + assert(ex.getMessage.contains("self-loop")) |
| 186 | + } |
| 187 | + |
| 188 | + // --------------------------------------------------------------------------- |
| 189 | + // Jackson round-trip (production objectMapper) |
| 190 | + // --------------------------------------------------------------------------- |
| 191 | + // |
| 192 | + // These tests use the same `JSONUtils.objectMapper` that production uses |
| 193 | + // to read user-saved workflow JSON, so a regression in the Jackson |
| 194 | + // wiring (annotations, default-Scala-module config) surfaces here. |
| 195 | + |
| 196 | + "LogicalLink Jackson deserialization" should |
| 197 | + "deserialize fromOpId / toOpId from raw String values via the secondary @JsonCreator constructor" in { |
| 198 | + // Build the JSON by hand to mimic a user-saved workflow file where |
| 199 | + // `fromOpId` and `toOpId` are written as plain strings (the only shape |
| 200 | + // production actually receives, since the frontend emits them as |
| 201 | + // strings). Jackson dispatches to the @JsonCreator string-overload |
| 202 | + // constructor. |
| 203 | + val node = objectMapper.createObjectNode() |
| 204 | + node.put("fromOpId", "op-A") |
| 205 | + node.set("fromPortId", objectMapper.valueToTree[JsonNode](PortIdentity(0))) |
| 206 | + node.put("toOpId", "op-B") |
| 207 | + node.set("toPortId", objectMapper.valueToTree[JsonNode](PortIdentity(1))) |
| 208 | + val link = objectMapper.treeToValue(node, classOf[LogicalLink]) |
| 209 | + assert(link.fromOpId == OperatorIdentity("op-A")) |
| 210 | + assert(link.toOpId == OperatorIdentity("op-B")) |
| 211 | + assert(link.fromPortId == PortIdentity(0)) |
| 212 | + assert(link.toPortId == PortIdentity(1)) |
| 213 | + } |
| 214 | + |
| 215 | + it should "emit `fromOpId` / `toOpId` JSON keys pinned by @JsonProperty annotations" in { |
| 216 | + // Only `fromOpId` / `toOpId` carry `@JsonProperty` in `LogicalLink`; |
| 217 | + // a Scala-side rename of either parameter would still keep the |
| 218 | + // JSON key stable, which is the saved-workflow contract these |
| 219 | + // annotations pin. |
| 220 | + val link = LogicalLink( |
| 221 | + OperatorIdentity("op-A"), |
| 222 | + PortIdentity(0), |
| 223 | + OperatorIdentity("op-B"), |
| 224 | + PortIdentity(1) |
| 225 | + ) |
| 226 | + val tree = objectMapper.valueToTree[JsonNode](link) |
| 227 | + assert(tree.has("fromOpId")) |
| 228 | + assert(tree.has("toOpId")) |
| 229 | + } |
| 230 | + |
| 231 | + it should "emit `fromPortId` / `toPortId` JSON keys derived from Scala parameter names (no @JsonProperty)" in { |
| 232 | + // Pin: the port-id JSON keys come from Scala parameter names since |
| 233 | + // there is no `@JsonProperty` annotation on those fields. A |
| 234 | + // parameter rename WOULD silently break saved-workflow compatibility |
| 235 | + // for these keys — pin so a future rename without an accompanying |
| 236 | + // `@JsonProperty` annotation breaks this on purpose. |
| 237 | + val link = LogicalLink( |
| 238 | + OperatorIdentity("op-A"), |
| 239 | + PortIdentity(0), |
| 240 | + OperatorIdentity("op-B"), |
| 241 | + PortIdentity(1) |
| 242 | + ) |
| 243 | + val tree = objectMapper.valueToTree[JsonNode](link) |
| 244 | + assert(tree.has("fromPortId")) |
| 245 | + assert(tree.has("toPortId")) |
| 246 | + } |
| 247 | + |
| 248 | + it should "NOT round-trip through writeValueAsString (the @JsonCreator string overload is incompatible with the object-shape OperatorIdentity that writeValueAsString emits)" in { |
| 249 | + // Characterization of a real asymmetry tracked by |
| 250 | + // https://github.com/apache/texera/issues/5042. Production reads |
| 251 | + // user-saved workflow JSON where `fromOpId`/`toOpId` are plain |
| 252 | + // strings, but `objectMapper.writeValueAsString` writes |
| 253 | + // OperatorIdentity as `{"id":"op-A"}` (the case-class object form). |
| 254 | + // Re-reading the emitted JSON fails because Jackson dispatches on the |
| 255 | + // @JsonCreator string overload, which can't accept an object for |
| 256 | + // fromOpId. When the issue is fixed (additional @JsonCreator object |
| 257 | + // overload or a custom @JsonDeserialize), this test must flip to a |
| 258 | + // passing round-trip assertion alongside the fix. |
| 259 | + val original = LogicalLink( |
| 260 | + OperatorIdentity("op-A"), |
| 261 | + PortIdentity(0), |
| 262 | + OperatorIdentity("op-B"), |
| 263 | + PortIdentity(1) |
| 264 | + ) |
| 265 | + val json = objectMapper.writeValueAsString(original) |
| 266 | + // Parse the emitted JSON and confirm the structural shape — fromOpId |
| 267 | + // is an object with an `id` field of "op-A". Avoids depending on |
| 268 | + // exact key ordering or escaping. |
| 269 | + val tree = objectMapper.readTree(json) |
| 270 | + assert(tree.path("fromOpId").isObject, s"expected fromOpId to be an object: $json") |
| 271 | + assert(tree.path("fromOpId").path("id").asText() == "op-A") |
| 272 | + // Re-reading the just-emitted JSON fails because the @JsonCreator |
| 273 | + // String overload can't accept the object-shape fromOpId. |
| 274 | + intercept[MismatchedInputException] { |
| 275 | + objectMapper.readValue(json, classOf[LogicalLink]) |
| 276 | + } |
| 277 | + } |
| 278 | + |
| 279 | + it should "reject missing string op-id fields when deserializing via Jackson" in { |
| 280 | + // When `fromOpId` / `toOpId` are omitted, Jackson invokes the |
| 281 | + // @JsonCreator with `null` for the missing String args. The primary |
| 282 | + // constructor's `require` on non-null/non-empty ids then throws, and |
| 283 | + // Jackson wraps it in `ValueInstantiationException` with the original |
| 284 | + // `IllegalArgumentException` as the cause. |
| 285 | + val empty = objectMapper.createObjectNode() |
| 286 | + val ex = intercept[ValueInstantiationException] { |
| 287 | + objectMapper.treeToValue(empty, classOf[LogicalLink]) |
| 288 | + } |
| 289 | + assert(ex.getCause.isInstanceOf[IllegalArgumentException]) |
| 290 | + } |
| 291 | +} |
0 commit comments