|
| 1 | +---------------------- MODULE BlockingQueuePoisonApple ---------------------- |
| 2 | +EXTENDS Naturals, Sequences, FiniteSets |
| 3 | + |
| 4 | +CONSTANTS Producers, (* the (nonempty) set of producers *) |
| 5 | + Consumers, (* the (nonempty) set of consumers *) |
| 6 | + BufCapacity, (* the maximum number of messages in the bounded buffer *) |
| 7 | + Poison |
| 8 | + |
| 9 | +ASSUME Assumption == |
| 10 | + /\ Producers # {} (* at least one producer *) |
| 11 | + /\ Consumers # {} (* at least one consumer *) |
| 12 | + /\ Producers \intersect Consumers = {} (* no thread is both consumer and producer *) |
| 13 | + /\ BufCapacity \in (Nat \ {0}) (* buffer capacity is at least 1 *) |
| 14 | + |
| 15 | +----------------------------------------------------------------------------- |
| 16 | + |
| 17 | +\* prod maps each producer to its apple slices that it will eventually send to |
| 18 | +\* consumers. cons maps each consumer to the apple slices it has eaten so far. |
| 19 | +VARIABLES prod, cons |
| 20 | + |
| 21 | +----------------------------------------------------------------------------- |
| 22 | + |
| 23 | +ap == |
| 24 | + \* The set of active producers, i.e., producers that have yet to send out |
| 25 | + \* some bites of their poison apple. |
| 26 | + {p \in DOMAIN prod: prod[p] > 0} \* <=> prod \notin [P -> {0}] |
| 27 | + |
| 28 | +ac == |
| 29 | + \* The set of active consumers, i.e., consumers that have yet to take a |
| 30 | + \* bite of poison apples before they die/can terminate. |
| 31 | + {c \in DOMAIN cons: cons[c] > 0} \* <=> cons \notin [C -> {0}] |
| 32 | + |
| 33 | +----------------------------------------------------------------------------- |
| 34 | + |
| 35 | +VARIABLES buffer, waitSet |
| 36 | +vars == <<buffer, waitSet, prod, cons>> |
| 37 | + |
| 38 | +NotifyOther(Others) == |
| 39 | + IF waitSet \cap Others # {} |
| 40 | + THEN \E t \in waitSet \cap Others : waitSet' = waitSet \ {t} |
| 41 | + ELSE UNCHANGED waitSet |
| 42 | + |
| 43 | +(* @see java.lang.Object#wait *) |
| 44 | +Wait(t) == /\ waitSet' = waitSet \cup {t} |
| 45 | + /\ UNCHANGED <<buffer>> |
| 46 | + |
| 47 | +----------------------------------------------------------------------------- |
| 48 | + |
| 49 | +(* |
| 50 | + Contrary to PoisonPill, there is no dedicated "janitor" process that requires |
| 51 | + some form of *external* synchronization. Global termination is achieved without |
| 52 | + synchronizing producers. Instead, the tradeoff of this variant is a tighter |
| 53 | + coupling between producers and consumers because producers *and* consumers |
| 54 | + need to know the number (cardinality) of the other role (see the definition of |
| 55 | + prod and cons in Init below to see why that is). |
| 56 | + Why is this complexity necessary, why can't we just have each producer send |
| 57 | + a poison pill to one consumer (and some producers send more than one pill iff |
| 58 | + |C| > |P|)? While this would work iff |C| >= |P|, it fails if |C| < |P|. Let |
| 59 | + P_p be the subset of P such that |P_p| = |C|, i.e. the producers that poison |
| 60 | + consumers. If one or more of the producers not in P_p remain active after all |
| 61 | + producers in P_p terminated, there are no (active) consumers left. |
| 62 | +*) |
| 63 | +Put(t, d) == |
| 64 | + /\ t \notin waitSet |
| 65 | + \* The Producer t must not have initiated its termination. |
| 66 | + /\ prod[t] > 0 |
| 67 | + /\ \/ /\ Len(buffer) < BufCapacity |
| 68 | + /\ \/ /\ buffer' = Append(buffer, d) |
| 69 | + /\ UNCHANGED prod |
| 70 | + \/ /\ buffer' = Append(buffer, Poison) |
| 71 | + \* Producer t messages one consumer. |
| 72 | + /\ prod' = [ prod EXCEPT ![t] = @ - 1] |
| 73 | + /\ NotifyOther(Consumers) |
| 74 | + \/ /\ Len(buffer) = BufCapacity |
| 75 | + /\ Wait(t) |
| 76 | + /\ UNCHANGED prod |
| 77 | + /\ UNCHANGED <<cons>> |
| 78 | + |
| 79 | +Get(t) == |
| 80 | +/\ t \notin waitSet |
| 81 | +/\ cons[t] > 0 |
| 82 | +/\ \/ /\ buffer # <<>> |
| 83 | + /\ buffer' = Tail(buffer) |
| 84 | + /\ NotifyOther(Producers) |
| 85 | + /\ \/ /\ Head(buffer) # Poison |
| 86 | + /\ UNCHANGED <<prod,cons>> |
| 87 | + \/ /\ Head(buffer) = Poison |
| 88 | + /\ cons' = [ cons EXCEPT ![t] = @ - 1] |
| 89 | + /\ UNCHANGED <<prod>> |
| 90 | + \/ /\ buffer = <<>> |
| 91 | + /\ Wait(t) |
| 92 | + /\ UNCHANGED <<prod,cons>> |
| 93 | + |
| 94 | +----------------------------------------------------------------------------- |
| 95 | + |
| 96 | +(* Initially, the buffer is empty and no thread is waiting. *) |
| 97 | +Init == |
| 98 | + /\ buffer = <<>> |
| 99 | + /\ waitSet = {} |
| 100 | + \* The system requires |C| * |P| poison pills to terminate reliably. Perhaps, |
| 101 | + \* this is what Goetz et. al. mean by "unwiedly" on page 156 in their book |
| 102 | + \* Java Concurrency in Practice. However, it is unclear why the authors write |
| 103 | + \* on the same page that "Poison pills work reliably only with unbounded queues." |
| 104 | + /\ cons = [ c \in Consumers |-> Cardinality(Producers) ] |
| 105 | + /\ prod = [ p \in Producers |-> Cardinality(Consumers) ] |
| 106 | + |
| 107 | +(* Then, pick a thread out of all running threads and have it do its thing. *) |
| 108 | +Next == |
| 109 | + \/ \E p \in Producers: Put(p, p) \* Add some data to buffer |
| 110 | + \/ \E c \in Consumers: Get(c) |
| 111 | + |
| 112 | +Spec == |
| 113 | + /\ Init |
| 114 | + /\ [][Next]_vars |
| 115 | + /\ \A c \in Consumers : WF_vars(Get(c)) |
| 116 | + /\ \A p \in Producers : WF_vars(Put(p, p)) |
| 117 | + |
| 118 | +----------------------------------------------------------------------------- |
| 119 | + |
| 120 | +(* TLA+ is untyped, thus lets verify the range of some values in each state. *) |
| 121 | +TypeInv == |
| 122 | + /\ buffer \in Seq(Producers \cup {Poison}) |
| 123 | + /\ Len(buffer) \in 0..BufCapacity |
| 124 | + /\ waitSet \in SUBSET (Producers \cup Consumers) |
| 125 | + /\ prod \in [ Producers -> 0..Cardinality(Consumers) ] |
| 126 | + /\ cons \in [ Consumers -> 0..Cardinality(Producers) ] |
| 127 | + |
| 128 | +(* No Deadlock *) |
| 129 | +NoDeadlock == waitSet # (Producers \cup Consumers) |
| 130 | + |
| 131 | +THEOREM Spec => [](TypeInv /\ NoDeadlock) |
| 132 | +----------------------------------------------------------------------------- |
| 133 | + |
| 134 | +\* The queue is empty after (global) termination. |
| 135 | +QueueEmpty == |
| 136 | + []((ap \cup ac) = {}) => (buffer = <<>>) |
| 137 | + |
| 138 | +\* The system terminates iff all producers terminate. |
| 139 | +GlobalTermination == |
| 140 | + (ap = {}) ~> [](ac = {}) |
| 141 | + |
| 142 | +THEOREM Spec => QueueEmpty /\ GlobalTermination |
| 143 | +----------------------------------------------------------------------------- |
| 144 | + |
| 145 | +SSpec == |
| 146 | + /\ Init |
| 147 | + /\ [][Next]_vars |
| 148 | + /\ \A c \in Consumers : WF_vars(Get(c)) |
| 149 | + \* This fairness constraint causes all producers to eventually terminate. |
| 150 | + /\ \A p \in Producers : SF_vars(Put(p, p) /\ prod' # prod) |
| 151 | + |
| 152 | +Terminates == |
| 153 | + <>[](ap = {}) |
| 154 | + |
| 155 | +THEOREM SSpec => Terminates |
| 156 | +============================================================================= |
0 commit comments