@@ -162,7 +162,7 @@ private final class Pipe {
162162 /// After starting the returned producer, `readFD` should not be used
163163 /// anywhere else, as it may close unexpectedly.
164164 func transferReadsToProducer( ) -> ReadProducer {
165- return SignalProducer { observer, disposable in
165+ return SignalProducer { observer, lifetime in
166166 self . group. enter ( )
167167 let channel = DispatchIO ( type: . stream, fileDescriptor: self . readFD, queue: self . queue) { error in
168168 if error == 0 {
@@ -200,7 +200,7 @@ private final class Pipe {
200200 }
201201 }
202202
203- let _ = disposable . add {
203+ lifetime . observeEnded {
204204 channel. close ( flags: . stop)
205205 }
206206 }
@@ -215,7 +215,7 @@ private final class Pipe {
215215 ///
216216 /// Returns a producer that will complete or error.
217217 func writeDataFromProducer( _ producer: SignalProducer < Data , NoError > ) -> SignalProducer < ( ) , TaskError > {
218- return SignalProducer { observer, disposable in
218+ return SignalProducer { observer, lifetime in
219219 self . group. enter ( )
220220 let channel = DispatchIO ( type: . stream, fileDescriptor: self . writeFD, queue: self . queue) { error in
221221 if error == 0 {
@@ -231,9 +231,9 @@ private final class Pipe {
231231 }
232232
233233 producer. startWithSignal { signal, producerDisposable in
234- disposable . add ( producerDisposable)
234+ lifetime += producerDisposable
235235
236- signal. observe ( Observer ( value: { data in
236+ signal. observe ( Signal . Observer ( value: { data in
237237 let dispatchData = data. withUnsafeBytes { ( bytes: UnsafePointer < UInt8 > ) -> DispatchData in
238238 let buffer = UnsafeBufferPointer ( start: bytes, count: data. count)
239239 return DispatchData ( bytes: buffer)
@@ -253,7 +253,7 @@ private final class Pipe {
253253 } ) )
254254 }
255255
256- let _ = disposable . add {
256+ lifetime . observeEnded {
257257 channel. close ( flags: . stop)
258258 }
259259 }
@@ -409,7 +409,7 @@ extension Task {
409409 /// - Returns: A producer that will launch the receiver when started, then send
410410 /// `TaskEvent`s as execution proceeds.
411411 public func launch( standardInput: SignalProducer < Data , NoError > ? = nil ) -> SignalProducer < TaskEvent < Data > , TaskError > {
412- return SignalProducer { observer, disposable in
412+ return SignalProducer { observer, lifetime in
413413 let queue = DispatchQueue ( label: self . description, attributes: [ ] )
414414 let group = Task . group
415415
@@ -466,15 +466,15 @@ extension Task {
466466 }
467467 }
468468
469- return SignalProducer { observer, disposable in
469+ return SignalProducer { observer, lifetime in
470470 func startAggregating( producer: Pipe . ReadProducer , chunk: @escaping ( Data ) -> TaskEvent < Data > ) -> Pipe . ReadProducer {
471471 let aggregated = MutableProperty < Aggregation ? > ( nil )
472472
473473 producer. startWithSignal { signal, signalDisposable in
474- disposable += signalDisposable
474+ lifetime += signalDisposable
475475
476476 var aggregate = Data ( )
477- signal. observe ( Observer ( value: { data in
477+ signal. observe ( Signal . Observer ( value: { data in
478478 observer. send ( value: chunk ( data) )
479479 aggregate. append ( data)
480480 } , failed: { error in
@@ -504,14 +504,14 @@ extension Task {
504504 if terminationStatus == EXIT_SUCCESS {
505505 // Wait for stderr to finish, then pass
506506 // through stdout.
507- disposable += stderrAggregated
507+ lifetime += stderrAggregated
508508 . then ( stdoutAggregated)
509509 . map ( TaskEvent . success)
510510 . start ( observer)
511511 } else {
512512 // Wait for stdout to finish, then pass
513513 // through stderr.
514- disposable += stdoutAggregated
514+ lifetime += stdoutAggregated
515515 . then ( stderrAggregated)
516516 . flatMap ( . concat) { data -> SignalProducer < TaskEvent < Data > , TaskError > in
517517 let errorString = ( data. count > 0 ? String ( data: data, encoding: . utf8) : nil )
@@ -527,15 +527,15 @@ extension Task {
527527 close ( stdoutPipe. writeFD)
528528 close ( stderrPipe. writeFD)
529529
530- disposable += stdinProducer. start ( )
530+ lifetime += stdinProducer. start ( )
531531
532- let _ = disposable . add {
532+ lifetime . observeEnded {
533533 process. terminate ( )
534534 }
535535 }
536536 }
537537 . startWithSignal { signal, taskDisposable in
538- disposable . add ( taskDisposable)
538+ lifetime += taskDisposable
539539 signal. observe ( observer)
540540 }
541541 }
0 commit comments