@@ -18,7 +18,8 @@ package cats
1818package effect
1919package std
2020
21- import cats .effect .kernel .{Async , GenConcurrent , Sync }
21+ import cats .effect .kernel ._
22+ import cats .syntax .all ._
2223
2324/**
2425 * A purely functional, concurrent data structure which allows insertion and retrieval of
@@ -38,19 +39,19 @@ abstract class Stack[F[_], A] { self =>
3839 /**
3940 * Pushes the given element to the top of the `Stack`.
4041 *
41- * @param a
42+ * @param element
4243 * the element to push at the top of the `Stack`.
4344 */
44- def push (a : A ): F [Unit ]
45+ def push (element : A ): F [Unit ]
4546
4647 /**
4748 * Pushes the given elements to the top of the `Stack`, the last element will be the final
4849 * top.
4950 *
50- * @param as
51+ * @param elements
5152 * the elements to push at the top of the `Stack`.
5253 */
53- def pushN (as : A * ): F [Unit ]
54+ def pushN (elements : A * ): F [Unit ]
5455
5556 /**
5657 * Takes the top element of `Stack`, if there is none it will semantically block until one is
@@ -92,11 +93,11 @@ abstract class Stack[F[_], A] { self =>
9293 */
9394 final def mapK [G [_]](f : F ~> G ): Stack [G , A ] =
9495 new Stack [G , A ] {
95- override def push (a : A ): G [Unit ] =
96- f(self.push(a ))
96+ override def push (element : A ): G [Unit ] =
97+ f(self.push(element ))
9798
98- override def pushN (as : A * ): G [Unit ] =
99- f(self.pushN(as : _* ))
99+ override def pushN (elements : A * ): G [Unit ] =
100+ f(self.pushN(elements : _* ))
100101
101102 override def pop : G [A ] =
102103 f(self.pop)
@@ -117,12 +118,127 @@ object Stack {
117118 /**
118119 * Creates a new `Stack`.
119120 */
120- def apply [F [_], A ](implicit F : GenConcurrent [F , _]): F [Stack [F , A ]] =
121- ???
121+ def apply [F [_], A ](implicit F : Concurrent [F ]): F [Stack [F , A ]] =
122+ // Initialize the state with an empty stack.
123+ Ref .of[F , StackState [F , A ]](StackState .empty).map(state => new ConcurrentImpl (state))
122124
123125 /**
124126 * Creates a new `Stack`. Like `apply` but initializes state using another effect constructor.
125127 */
126128 def in [F [_], G [_], A ](implicit F : Sync [F ], G : Async [G ]): F [Stack [G , A ]] =
127- ???
129+ // Initialize the state with an empty stack.
130+ Ref .in[F , G , StackState [G , A ]](StackState .empty).map(state => new ConcurrentImpl (state))
131+
132+ private final case class StackState [F [_], A ](
133+ elements : List [A ],
134+ waiters : collection.immutable.Queue [Deferred [F , A ]]
135+ ) {
136+ type CopyResult = StackState [F , A ]
137+ type ModifyResult [R ] = (CopyResult , R )
138+
139+ def push (element : A )(implicit F : Concurrent [F ]): ModifyResult [F [Unit ]] =
140+ waiters.dequeueOption match {
141+ case Some ((waiter, remainingWaiters)) =>
142+ this .copy(waiters = remainingWaiters) -> waiter.complete(element).void
143+
144+ case None =>
145+ this .copy(elements = element :: this .elements) -> F .unit
146+ }
147+
148+ def pushN (elements : Seq [A ])(implicit F : Concurrent [F ]): ModifyResult [F [Unit ]] =
149+ if (this .waiters.isEmpty)
150+ // If there are no waiters we just push all the elements in reverse order.
151+ this .copy(elements = this .elements.prependedAll(elements.reverseIterator)) -> F .unit
152+ else {
153+ // Otherwise, if there is at least one waiter, we take all we can.
154+ val (remaining, waitersToNotify) =
155+ elements.reverse.align(this .waiters).partitionMap(_.unwrap)
156+
157+ // We notify all the waiters we could take.
158+ val notifyWaiters = waitersToNotify.traverse_ {
159+ case (element, waiter) =>
160+ waiter.complete(element).void
161+ }
162+
163+ // The remaining elements are either all elements, or all waiters.
164+ val newState = remaining.parTraverse(_.toEitherNec) match {
165+ case Left (remainingElements) =>
166+ // If only elements remained, then we preserve all the pending waiters,
167+ // and set the Stack elements as the remaining ones.
168+ // This is safe because the remaining elements are already in the correct order,
169+ // and since there was at least one waiter then we can assume there were not pending elements.
170+ this .copy(elements = remainingElements.toList)
171+
172+ case Right (remainingWaiters) =>
173+ // If only waiters remained, then we create a new Queue from them.
174+ this .copy(waiters = collection.immutable.Queue .from(remainingWaiters))
175+ }
176+
177+ newState -> notifyWaiters
178+ }
179+
180+ def pop (
181+ waiter : Deferred [F , A ],
182+ poll : Poll [F ]
183+ )(
184+ implicit F : Concurrent [F ]
185+ ): ModifyResult [F [A ]] =
186+ elements match {
187+ case head :: tail =>
188+ this .copy(elements = tail) -> F .pure(head)
189+
190+ case Nil =>
191+ this .copy(waiters = waiters.enqueue(waiter)) -> poll(waiter.get)
192+ }
193+
194+ def removeWaiter (waiter : Deferred [F , A ]): CopyResult =
195+ this .copy(waiters = this .waiters.filterNot(_ eq waiter))
196+
197+ def tryPop : ModifyResult [Option [A ]] =
198+ elements match {
199+ case head :: tail =>
200+ this .copy(elements = tail) -> Some (head)
201+
202+ case Nil =>
203+ this -> None
204+ }
205+ }
206+
207+ private object StackState {
208+ def empty [F [_], A ]: StackState [F , A ] = StackState (
209+ elements = List .empty,
210+ waiters = collection.immutable.Queue .empty
211+ )
212+ }
213+
214+ private final class ConcurrentImpl [F [_], A ](
215+ state : Ref [F , StackState [F , A ]]
216+ )(
217+ implicit F : Concurrent [F ]
218+ ) extends Stack [F , A ] {
219+ override def push (element : A ): F [Unit ] =
220+ F .uncancelable(_ => state.flatModify(_.push(element)))
221+
222+ override def pushN (elements : A * ): F [Unit ] =
223+ F .uncancelable(_ => state.flatModify(_.pushN(elements)))
224+
225+ override final val pop : F [A ] =
226+ F .uncancelable { poll =>
227+ for {
228+ waiter <- Deferred [F , A ]
229+ wait <- state.modify(_.pop(waiter, poll))
230+ waitCancelledFinalizer = state.update(_.removeWaiter(waiter))
231+ result <- F .onCancel(wait, waitCancelledFinalizer)
232+ } yield result
233+ }
234+
235+ override final val tryPop : F [Option [A ]] =
236+ F .uncancelable(_ => state.modify(_.tryPop))
237+
238+ override final val peek : F [Option [A ]] =
239+ state.get.map(_.elements.headOption)
240+
241+ override final val size : F [Int ] =
242+ state.get.map(_.elements.size)
243+ }
128244}
0 commit comments