|
| 1 | +----------------------- MODULE BlockingQueuePoisonPill ----------------------- |
| 2 | +EXTENDS Naturals, Sequences, FiniteSets |
| 3 | + |
| 4 | +CONSTANTS Producers, (* the (nonempty) set of producers *) |
| 5 | + Consumers, (* the (nonempty) set of consumers *) |
| 6 | + BufCapacity (* the maximum number of messages in the bounded buffer *) |
| 7 | + |
| 8 | +ASSUME Assumption == |
| 9 | + /\ Producers # {} (* at least one producer *) |
| 10 | + /\ Consumers # {} (* at least one consumer *) |
| 11 | + /\ Producers \intersect Consumers = {} (* no thread is both consumer and producer *) |
| 12 | + /\ BufCapacity \in (Nat \ {0}) (* buffer capacity is at least 1 *) |
| 13 | + |
| 14 | +VARIABLES buffer, waitSet, prod, cons |
| 15 | +vars == <<buffer, waitSet, prod, cons>> |
| 16 | + |
| 17 | +----------------------------------------------------------------------------- |
| 18 | + |
| 19 | +NotifyOther(t) == |
| 20 | + LET S == IF t \in Producers THEN waitSet \ Producers ELSE waitSet \ Consumers |
| 21 | + IN IF S # {} |
| 22 | + THEN \E x \in S : waitSet' = waitSet \ {x} |
| 23 | + ELSE UNCHANGED waitSet |
| 24 | + |
| 25 | +Wait(t) == /\ waitSet' = waitSet \cup {t} |
| 26 | + /\ UNCHANGED <<buffer>> |
| 27 | + |
| 28 | +----------------------------------------------------------------------------- |
| 29 | + |
| 30 | +Poison == CHOOSE v : TRUE |
| 31 | + |
| 32 | +Put(t, d) == |
| 33 | + /\ UNCHANGED <<prod, cons>> |
| 34 | + /\ t \notin waitSet |
| 35 | + /\ \/ /\ Len(buffer) < BufCapacity |
| 36 | + /\ buffer' = Append(buffer, d) |
| 37 | + /\ NotifyOther(t) |
| 38 | + \/ /\ Len(buffer) = BufCapacity |
| 39 | + /\ Wait(t) |
| 40 | + |
| 41 | +Get(t) == |
| 42 | + /\ UNCHANGED <<prod>> |
| 43 | + /\ t \notin waitSet |
| 44 | + /\ \/ /\ buffer # <<>> |
| 45 | + /\ buffer' = Tail(buffer) |
| 46 | + /\ NotifyOther(t) |
| 47 | + /\ IF Head(buffer) = Poison |
| 48 | + \* A "poison pill" terminates this consumer. |
| 49 | + THEN cons' = cons \ {t} |
| 50 | + ELSE UNCHANGED <<cons>> |
| 51 | + \/ /\ buffer = <<>> |
| 52 | + /\ Wait(t) |
| 53 | + /\ UNCHANGED <<cons>> |
| 54 | + |
| 55 | +\* Producers can terminate at any time unless blocked/waiting. |
| 56 | +Terminate(t) == |
| 57 | + /\ UNCHANGED <<buffer, waitSet, cons>> |
| 58 | + /\ t \notin waitSet |
| 59 | + /\ prod' = prod \ {t} |
| 60 | + |
| 61 | +(* |
| 62 | + A dedicated "janitor" process sends a poisonous pill to each Consumer after |
| 63 | + all producers have terminated. The poisoned pill causes the Consumers to |
| 64 | + terminate in turn. Synchronization between the Producers and the Janitor is |
| 65 | + left implicit. Possible implementations are discussed below. |
| 66 | +*) |
| 67 | +Cleanup == |
| 68 | + \* An implementation could use e.g. a Phaser that Producers arrive |
| 69 | + \* one, and cleanup runs as part of the phaser's onadvance. Obviously, |
| 70 | + \* this simply delegates part of the problem we are trying to solve |
| 71 | + \* to another concurrency primitive, which might be acceptable but |
| 72 | + \* cannot be considered elegant. |
| 73 | + /\ prod = {} |
| 74 | + \* This could be implemented with a basic counter that keeps track of |
| 75 | + \* the number of Consumers that still have to receive a Poison Pill. |
| 76 | + /\ cons # {} |
| 77 | + /\ \/ buffer = <<>> |
| 78 | + \* ...there a fewer Poison messages in the buffer than (non-terminated) |
| 79 | + \* Consumers. |
| 80 | + \/ Cardinality(cons) < Cardinality({i \in DOMAIN buffer: buffer[i]=Poison}) |
| 81 | + \* Make one of the producers the janitor that cleans up (we always |
| 82 | + \* choose the same janitor). An implementation may simply create a fresh |
| 83 | + \* process/thread (here it would be a nuisance because of TypeInv...). |
| 84 | + /\ Put(CHOOSE p \in Producers: TRUE, Poison) |
| 85 | + |
| 86 | +----------------------------------------------------------------------------- |
| 87 | + |
| 88 | +(* Initially, the buffer is empty and no thread is waiting. *) |
| 89 | +Init == /\ prod = Producers |
| 90 | + /\ cons = Consumers |
| 91 | + /\ buffer = <<>> |
| 92 | + /\ waitSet = {} |
| 93 | + |
| 94 | +(* Then, pick a thread out of all running threads and have it do its thing. *) |
| 95 | +Next == |
| 96 | + /\ \/ \E p \in prod: Put(p, p) |
| 97 | + \/ \E p \in prod: Terminate(p) |
| 98 | + \/ \E c \in cons: Get(c) |
| 99 | + \/ Cleanup |
| 100 | + |
| 101 | +----------------------------------------------------------------------------- |
| 102 | + |
| 103 | +(* TLA+ is untyped, thus lets verify the range of some values in each state. *) |
| 104 | +TypeInv == |
| 105 | + /\ buffer \in Seq(Producers \cup {Poison}) |
| 106 | + /\ Len(buffer) \in 0..BufCapacity |
| 107 | + /\ waitSet \in SUBSET (Consumers \cup Producers) |
| 108 | + /\ prod \in SUBSET Producers |
| 109 | + /\ cons \in SUBSET Consumers |
| 110 | + |
| 111 | +(* No Deadlock *) |
| 112 | +NoDeadlock == waitSet # (Producers \cup Consumers) |
| 113 | + |
| 114 | +\* The queue is empty after (global) termination. |
| 115 | +QueueEmpty == |
| 116 | + ((prod \cup cons) = {}) => (buffer = <<>>) |
| 117 | + |
| 118 | +\* The system terminates iff all producers terminate. |
| 119 | +GlobalTermination == |
| 120 | + (prod = {}) ~> [](cons = {}) |
| 121 | + |
| 122 | +Spec == |
| 123 | + Init /\ [][Next]_vars /\ WF_vars(Next) |
| 124 | + |
| 125 | +============================================================================= |
0 commit comments