@@ -57,8 +57,8 @@ namespace litecore::repl {
5757
5858 // Set up to handle the current message:
5959 DebugAssert (!_revMessage);
60- _revMessage = msg;
6160
61+ _revMessage = msg;
6262 _rev = new RevToInsert (this , _revMessage->property (" id" _sl), _revMessage->property (" rev" _sl),
6363 _revMessage->property (" history" _sl), _revMessage->boolProperty (" deleted" _sl),
6464 _revMessage->boolProperty (" noconflicts" _sl) || _options->noIncomingConflicts (),
@@ -190,6 +190,7 @@ namespace litecore::repl {
190190 }
191191
192192 // We've lost access to this doc on the server; it should be purged.
193+ // This runs on the caller's (Puller's) thread.
193194 void IncomingRev::handleRevokedDoc (RevToInsert* rev) {
194195 reinitialize ();
195196 _rev = rev;
@@ -211,6 +212,7 @@ namespace litecore::repl {
211212 insertRevision ();
212213 }
213214
215+ // This method may be enqueued as normal, or called directly from handleRev on the Puller's thread.
214216 void IncomingRev::parseAndInsert (alloc_slice jsonBody) {
215217 // First create a Fleece document:
216218 Doc fleeceDoc;
@@ -408,7 +410,7 @@ namespace litecore::repl {
408410 revoked ? _puller->revWasProvisionallyHandled <true >() : _puller->revWasProvisionallyHandled <false >();
409411 }
410412
411- // Called by the Inserter after the revision is safely committed to disk.
413+ // Called directly by the Inserter, on its thread, after the revision is safely committed to disk.
412414 void IncomingRev::revisionInserted () {
413415 Retained<IncomingRev> retainSelf = this ;
414416 decrement (_pendingCallbacks);
@@ -454,14 +456,29 @@ namespace litecore::repl {
454456 _blob = _pendingBlobs.end ();
455457 _rev->trim ();
456458
457- _puller->revWasHandled (this );
459+ // finish() can be called either on my queue, or on the Puller's or Inserter's queue.
460+ // If on my queue, I'm not ready to be reset until after the `afterEvent` call following
461+ // this method, so defer telling the Puller I'm done until then:
462+ if ( currentActor () == this ) _finishedAfterEvent = true ;
463+ else
464+ _puller->revWasHandled (this );
458465 }
459466
460467 void IncomingRev::reset () {
461- _rev = nullptr ;
462- _parent = nullptr ;
463- _remoteSequence = {};
464- _bodySize = 0 ;
468+ _rev = nullptr ;
469+ _parent = nullptr ;
470+ _remoteSequence = {};
471+ _bodySize = 0 ;
472+ _finishedAfterEvent = false ;
473+ }
474+
475+ // Call Worker::afterEvent to calculate status and progress, then notify
476+ // Puller we are done.
477+ void IncomingRev::afterEvent () {
478+ Worker::afterEvent ();
479+ // If the finish() method passed the buck to me (see above), call revWasHandled now:
480+ auto expected = true ;
481+ if ( _finishedAfterEvent.compare_exchange_strong (expected, false ) ) _puller->revWasHandled (this );
465482 }
466483
467484 Worker::ActivityLevel IncomingRev::computeActivityLevel (std::string* reason) const {
0 commit comments