@@ -589,17 +589,14 @@ public function createDocuments(
589589 array $ documents ,
590590 int $ batchSize = self ::INSERT_BATCH_SIZE ,
591591 ?callable $ onNext = null ,
592+ ?callable $ onError = null ,
592593 ): int {
593- $ modified = 0 ;
594-
595- $ this ->source ->createDocuments (
594+ $ modified = $ this ->source ->createDocuments (
596595 $ collection ,
597596 $ documents ,
598597 $ batchSize ,
599- function ($ doc ) use ($ onNext , &$ modified ) {
600- $ onNext && $ onNext ($ doc );
601- $ modified ++;
602- },
598+ $ onNext ,
599+ $ onError ,
603600 );
604601
605602 if (
@@ -638,7 +635,6 @@ function ($doc) use ($onNext, &$modified) {
638635 $ collection ,
639636 $ clones ,
640637 $ batchSize ,
641- null ,
642638 )
643639 );
644640
@@ -715,18 +711,13 @@ public function updateDocuments(
715711 ?callable $ onNext = null ,
716712 ?callable $ onError = null ,
717713 ): int {
718- $ modified = 0 ;
719-
720- $ this ->source ->updateDocuments (
714+ $ modified = $ this ->source ->updateDocuments (
721715 $ collection ,
722716 $ updates ,
723717 $ queries ,
724718 $ batchSize ,
725- function ($ doc , $ old ) use ($ onNext , &$ modified ) {
726- $ onNext && $ onNext ($ doc , $ old );
727- $ modified ++;
728- },
729- $ onError
719+ $ onNext ,
720+ $ onError ,
730721 );
731722
732723 if (
@@ -754,14 +745,13 @@ function ($doc, $old) use ($onNext, &$modified) {
754745 );
755746 }
756747
757- $ modified = $ this ->destination ->withPreserveDates (
748+ $ this ->destination ->withPreserveDates (
758749 fn () =>
759750 $ this ->destination ->updateDocuments (
760751 $ collection ,
761752 $ clone ,
762753 $ queries ,
763754 $ batchSize ,
764- null ,
765755 )
766756 );
767757
@@ -788,15 +778,12 @@ public function upsertDocuments(
788778 ?callable $ onNext = null ,
789779 ?callable $ onError = null ,
790780 ): int {
791- $ modified = 0 ;
792- $ this ->source ->upsertDocuments (
781+ $ modified = $ this ->source ->upsertDocuments (
793782 $ collection ,
794783 $ documents ,
795784 $ batchSize ,
796- function ($ doc , $ old ) use ($ onNext , &$ modified ) {
797- $ onNext && $ onNext ($ doc , $ old );
798- $ modified ++;
799- }
785+ $ onNext ,
786+ $ onError ,
800787 );
801788
802789 if (
@@ -829,13 +816,12 @@ function ($doc, $old) use ($onNext, &$modified) {
829816 $ clones [] = $ clone ;
830817 }
831818
832- $ modified = $ this ->destination ->withPreserveDates (
819+ $ this ->destination ->withPreserveDates (
833820 fn () =>
834821 $ this ->destination ->upsertDocuments (
835822 $ collection ,
836823 $ clones ,
837824 $ batchSize ,
838- null ,
839825 )
840826 );
841827
@@ -850,8 +836,9 @@ function ($doc, $old) use ($onNext, &$modified) {
850836 }
851837 }
852838 } catch (\Throwable $ err ) {
853- $ this ->logError ('createDocuments ' , $ err );
839+ $ this ->logError ('upsertDocuments ' , $ err );
854840 }
841+
855842 return $ modified ;
856843 }
857844
@@ -905,17 +892,12 @@ public function deleteDocuments(
905892 ?callable $ onNext = null ,
906893 ?callable $ onError = null ,
907894 ): int {
908- $ modified = 0 ;
909-
910- $ this ->source ->deleteDocuments (
895+ $ modified = $ this ->source ->deleteDocuments (
911896 $ collection ,
912897 $ queries ,
913898 $ batchSize ,
914- function ($ doc , $ old ) use (&$ modified , $ onNext ) {
915- $ onNext && $ onNext ($ doc , $ old );
916- $ modified ++;
917- },
918- $ onError
899+ $ onNext ,
900+ $ onError ,
919901 );
920902
921903 if (
@@ -940,11 +922,10 @@ function ($doc, $old) use (&$modified, $onNext) {
940922 );
941923 }
942924
943- $ modified = $ this ->destination ->deleteDocuments (
925+ $ this ->destination ->deleteDocuments (
944926 $ collection ,
945927 $ queries ,
946928 $ batchSize ,
947- null ,
948929 );
949930
950931 foreach ($ this ->writeFilters as $ filter ) {
0 commit comments