Skip to content

Commit 698058a

Browse files
balhoffclaude
andauthored
Fix intermittent InterruptedException in ZIO stream termination (#357)
Replace queue.shutdown with a data-driven sentinel (Option[Restriction]) to signal stream completion. queue.shutdown uses ZIO fiber interrupt mechanism, which can cascade through mapZIOParUnordered/flatMapPar and interrupt in-flight effects, causing the entire computation to fail with InterruptedException. This race is timing-dependent and surfaces primarily on low-core CI runners (2 cores) where the scheduling window is wider. The stream now ends cleanly via collectWhile when it encounters a None sentinel, avoiding ZIO interrupts entirely. Fixes ontodev/robot#1166 Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a56ad81 commit 698058a

1 file changed

Lines changed: 10 additions & 10 deletions

File tree

core/src/main/scala/org/renci/relationgraph/RelationGraph.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,42 +66,42 @@ object RelationGraph extends StrictLogging {
6666
allClasses(ontology).map(c => ZIO.succeed(processSubclasses(c, indexedWhelk.state, outputConfig.reflexiveSubclasses, outputConfig.equivalenceAsSubclass, outputConfig.outputClasses, outputConfig.outputIndividuals)))
6767
} else ZStream.empty
6868
val streamZ = for {
69-
queue <- Queue.unbounded[Restriction]
69+
queue <- Queue.unbounded[Option[Restriction]]
7070
activeRestrictions <- Ref.make(0)
7171
seenRefs <- ZIO.foreach(allProperties)(p => Ref.make(Set.empty[AtomicConcept]).map(p -> _)).map(_.toMap)
7272
_ <- traverse(specifiedProperties, properties, classes, queue, activeRestrictions, seenRefs)
73-
restrictionsStream = ZStream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, properties, classes, indexedWhelk, outputConfig.mode, specifiedProperties.isEmpty, outputConfig.outputClasses, outputConfig.outputIndividuals, queue, activeRestrictions, seenRefs))
73+
restrictionsStream = ZStream.fromQueue(queue).collectWhile { case Some(r) => r }.map(r => processRestrictionAndExtendQueue(r, properties, classes, indexedWhelk, outputConfig.mode, specifiedProperties.isEmpty, outputConfig.outputClasses, outputConfig.outputIndividuals, queue, activeRestrictions, seenRefs))
7474
allTasks = ontologyDeclarationStream ++ classesTasks ++ restrictionsStream
7575
} yield allTasks.mapZIOParUnordered(JRuntime.getRuntime.availableProcessors)(identity)
7676
ZStream.unwrap(streamZ)
7777
}
7878

7979
def allClasses(ont: OWLOntology): ZStream[Any, Nothing, OWLClass] = ZStream.fromIterable(ont.getClassesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLThing - OWLNothing)
8080

81-
def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = {
81+
def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Option[Restriction]], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = {
8282
val descendProperties = specifiedProperties.isEmpty
8383
val queryProperties = if (descendProperties) properties.subclasses.getOrElse(Top, Set.empty) - Bottom else specifiedProperties
8484
if (queryProperties.nonEmpty) ZIO.foreachParDiscard(queryProperties) { subprop =>
8585
traverseProperty(subprop, classes, queue, activeRestrictions, seenRefs)
8686
}
87-
else queue.shutdown
87+
else queue.offer(None).unit
8888
}
8989

90-
def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = {
90+
def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Option[Restriction]], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = {
9191
val restrictions = (classes.subclasses.getOrElse(Top, Set.empty) - Bottom).map(filler => Restriction(Role(property.id), filler))
9292
val propSeenRef = seenRefs(Role(property.id))
9393
for {
9494
_ <- propSeenRef.update { seenForThisProperty =>
9595
seenForThisProperty ++ restrictions.map(_.filler)
9696
}
9797
_ <- activeRestrictions.update(current => current + restrictions.size)
98-
_ <- queue.offerAll(restrictions).unit
98+
_ <- queue.offerAll(restrictions.map(Option(_))).unit
9999
active <- activeRestrictions.get
100-
_ <- queue.shutdown.when(active < 1)
100+
_ <- queue.offer(None).when(active < 1).unit
101101
} yield ()
102102
}
103103

104-
def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: IndexedReasonerState, mode: Config.TriplesMode, descendProperties: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[TriplesGroup] = {
104+
def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: IndexedReasonerState, mode: Config.TriplesMode, descendProperties: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, queue: Queue[Option[Restriction]], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[TriplesGroup] = {
105105
val triples = processRestriction(restriction, whelk, mode, outputClasses, outputIndividuals)
106106
val continue = triples.redundant.nonEmpty
107107
for {
@@ -128,9 +128,9 @@ object RelationGraph extends StrictLogging {
128128
} else ZIO.succeed(Set.empty[Restriction])
129129
newRestrictions = directFillerSubclassesRestrictions ++ directSubPropertyRestrictions
130130
_ <- activeRestrictions.update(current => current - 1 + newRestrictions.size)
131-
_ <- queue.offerAll(newRestrictions)
131+
_ <- queue.offerAll(newRestrictions.map(Option(_)))
132132
active <- activeRestrictions.get
133-
_ <- queue.shutdown.when(active < 1)
133+
_ <- queue.offer(None).when(active < 1).unit
134134
} yield triples
135135
}
136136

0 commit comments

Comments
 (0)