Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,31 @@
package org.apache.texera.workflow

import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.fasterxml.jackson.databind.JsonNode
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity

object LogicalLink {
private def readOperatorIdentity(node: JsonNode, fieldName: String): OperatorIdentity = {
if (node == null || node.isNull) {
OperatorIdentity(null)
} else if (node.isTextual) {
OperatorIdentity(node.asText())
} else if (node.isObject) {
val idNode = node.get("id")
if (idNode == null || idNode.isNull) {
OperatorIdentity(null)
} else {
OperatorIdentity(idNode.asText())
}
} else {
throw new IllegalArgumentException(
s"LogicalLink $fieldName must be a string or an object with an id field"
)
}
}
}

case class LogicalLink(
@JsonProperty("fromOpId") fromOpId: OperatorIdentity,
fromPortId: PortIdentity,
Expand All @@ -42,13 +64,27 @@ case class LogicalLink(
s"LogicalLink self-loop not allowed: fromOpId == toOpId == ${fromOpId.id}"
)

@JsonCreator
def this(
@JsonProperty("fromOpId") fromOpId: String,
fromOpId: String,
fromPortId: PortIdentity,
@JsonProperty("toOpId") toOpId: String,
toOpId: String,
toPortId: PortIdentity
) = {
this(OperatorIdentity(fromOpId), fromPortId, OperatorIdentity(toOpId), toPortId)
}

@JsonCreator
def this(
@JsonProperty("fromOpId") fromOpId: JsonNode,
fromPortId: PortIdentity,
@JsonProperty("toOpId") toOpId: JsonNode,
toPortId: PortIdentity
) = {
this(
LogicalLink.readOperatorIdentity(fromOpId, "fromOpId"),
fromPortId,
LogicalLink.readOperatorIdentity(toOpId, "toOpId"),
toPortId
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.texera.workflow

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.exc.{MismatchedInputException, ValueInstantiationException}
import com.fasterxml.jackson.databind.exc.ValueInstantiationException
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.util.JSONUtils.objectMapper
Expand Down Expand Up @@ -131,10 +131,10 @@ class LogicalLinkSpec extends AnyFlatSpec {
}

// ---------------------------------------------------------------------------
// Secondary @JsonCreator constructor (string opId variant)
// Secondary string opId constructor
// ---------------------------------------------------------------------------

"LogicalLink secondary @JsonCreator constructor" should "wrap raw String op ids in OperatorIdentity" in {
"LogicalLink secondary String constructor" should "wrap raw String op ids in OperatorIdentity" in {
val link = new LogicalLink(
fromOpId = "op-A",
fromPortId = PortIdentity(0),
Expand All @@ -160,7 +160,7 @@ class LogicalLinkSpec extends AnyFlatSpec {
assert(link.toOpId == OperatorIdentity("my.op-2"))
}

it should "reject the empty string as an op id via the @JsonCreator constructor" in {
it should "reject the empty string as an op id via the secondary String constructor" in {
intercept[IllegalArgumentException] {
new LogicalLink("", PortIdentity(0), "op-B", PortIdentity(1))
}
Expand All @@ -169,7 +169,7 @@ class LogicalLinkSpec extends AnyFlatSpec {
}
}

it should "reject a null string op id via the @JsonCreator constructor" in {
it should "reject a null string op id via the secondary String constructor" in {
intercept[IllegalArgumentException] {
new LogicalLink(null: String, PortIdentity(0), "op-B", PortIdentity(1))
}
Expand All @@ -178,7 +178,7 @@ class LogicalLinkSpec extends AnyFlatSpec {
}
}

it should "reject a self-loop via the @JsonCreator constructor (same string op id)" in {
it should "reject a self-loop via the secondary String constructor (same string op id)" in {
val ex = intercept[IllegalArgumentException] {
new LogicalLink("op-A", PortIdentity(0), "op-A", PortIdentity(1))
}
Expand All @@ -194,12 +194,11 @@ class LogicalLinkSpec extends AnyFlatSpec {
// wiring (annotations, default-Scala-module config) surfaces here.

"LogicalLink Jackson deserialization" should
"deserialize fromOpId / toOpId from raw String values via the secondary @JsonCreator constructor" in {
"deserialize fromOpId / toOpId from raw String values via the Jackson creator" in {
// Build the JSON by hand to mimic a user-saved workflow file where
// `fromOpId` and `toOpId` are written as plain strings (the only shape
// production actually receives, since the frontend emits them as
// strings). Jackson dispatches to the @JsonCreator string-overload
// constructor.
// strings). Jackson dispatches to the @JsonCreator constructor.
val node = objectMapper.createObjectNode()
node.put("fromOpId", "op-A")
node.set("fromPortId", objectMapper.valueToTree[JsonNode](PortIdentity(0)))
Expand Down Expand Up @@ -245,17 +244,7 @@ class LogicalLinkSpec extends AnyFlatSpec {
assert(tree.has("toPortId"))
}

it should "NOT round-trip through writeValueAsString (the @JsonCreator string overload is incompatible with the object-shape OperatorIdentity that writeValueAsString emits)" in {
// Characterization of a real asymmetry tracked by
// https://github.com/apache/texera/issues/5042. Production reads
// user-saved workflow JSON where `fromOpId`/`toOpId` are plain
// strings, but `objectMapper.writeValueAsString` writes
// OperatorIdentity as `{"id":"op-A"}` (the case-class object form).
// Re-reading the emitted JSON fails because Jackson dispatches on the
// @JsonCreator string overload, which can't accept an object for
// fromOpId. When the issue is fixed (additional @JsonCreator object
// overload or a custom @JsonDeserialize), this test must flip to a
// passing round-trip assertion alongside the fix.
it should "round-trip through writeValueAsString when OperatorIdentity fields use object shape" in {
val original = LogicalLink(
OperatorIdentity("op-A"),
PortIdentity(0),
Expand All @@ -269,16 +258,14 @@ class LogicalLinkSpec extends AnyFlatSpec {
val tree = objectMapper.readTree(json)
assert(tree.path("fromOpId").isObject, s"expected fromOpId to be an object: $json")
assert(tree.path("fromOpId").path("id").asText() == "op-A")
// Re-reading the just-emitted JSON fails because the @JsonCreator
// String overload can't accept the object-shape fromOpId.
intercept[MismatchedInputException] {
objectMapper.readValue(json, classOf[LogicalLink])
}

val roundTripped = objectMapper.readValue(json, classOf[LogicalLink])
assert(roundTripped == original)
}

it should "reject missing string op-id fields when deserializing via Jackson" in {
it should "reject missing op-id fields when deserializing via Jackson" in {
// When `fromOpId` / `toOpId` are omitted, Jackson invokes the
// @JsonCreator with `null` for the missing String args. The primary
// @JsonCreator with `null` for the missing args. The primary
// constructor's `require` on non-null/non-empty ids then throws, and
// Jackson wraps it in `ValueInstantiationException` with the original
// `IllegalArgumentException` as the cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,58 @@
package org.apache.texera.amber.compiler.model

import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.fasterxml.jackson.databind.JsonNode
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity

object LogicalLink {
private def readOperatorIdentity(node: JsonNode, fieldName: String): OperatorIdentity = {
if (node == null || node.isNull) {
OperatorIdentity(null)
} else if (node.isTextual) {
OperatorIdentity(node.asText())
} else if (node.isObject) {
val idNode = node.get("id")
if (idNode == null || idNode.isNull) {
OperatorIdentity(null)
} else {
OperatorIdentity(idNode.asText())
}
} else {
throw new IllegalArgumentException(
s"LogicalLink $fieldName must be a string or an object with an id field"
)
}
}
}

case class LogicalLink(
@JsonProperty("fromOpId") fromOpId: OperatorIdentity,
fromPortId: PortIdentity,
@JsonProperty("toOpId") toOpId: OperatorIdentity,
toPortId: PortIdentity
) {
@JsonCreator
def this(
@JsonProperty("fromOpId") fromOpId: String,
fromOpId: String,
fromPortId: PortIdentity,
@JsonProperty("toOpId") toOpId: String,
toOpId: String,
toPortId: PortIdentity
) = {
this(OperatorIdentity(fromOpId), fromPortId, OperatorIdentity(toOpId), toPortId)
}

@JsonCreator
def this(
@JsonProperty("fromOpId") fromOpId: JsonNode,
fromPortId: PortIdentity,
@JsonProperty("toOpId") toOpId: JsonNode,
toPortId: PortIdentity
) = {
this(
LogicalLink.readOperatorIdentity(fromOpId, "fromOpId"),
fromPortId,
LogicalLink.readOperatorIdentity(toOpId, "toOpId"),
toPortId
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import jakarta.ws.rs.client.Entity
import jakarta.ws.rs.core.{MediaType, Response}
import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo}
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.operator.filter.{
ComparisonType,
Expand Down Expand Up @@ -127,6 +128,20 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec with BeforeAndAfterAll
compilationResponse.asInstanceOf[WorkflowCompilationSuccess]
}

it should "round-trip LogicalLink JSON emitted by the production objectMapper" in {
val original = LogicalLink(
OperatorIdentity("op-A"),
PortIdentity(0),
OperatorIdentity("op-B"),
PortIdentity(1)
)

val json = objectMapper.writeValueAsString(original)
val roundTripped = objectMapper.readValue(json, classOf[LogicalLink])

assertThat(roundTripped).isEqualTo(original)
}

it should "compile workflow successfully with multiple filter and limit operations" in {
// construct the LogicalPlan: CSVScan --> Projection --> Limit --> Filter (TotalProfit > 10000) --> Filter (Region != "JPN") --> Limit
val localCsvFilePath =
Expand Down
Loading