3939import org .springframework .data .mongodb .core .convert .UpdateMapper ;
4040import org .springframework .data .mongodb .core .mapping .MongoPersistentEntity ;
4141import org .springframework .data .mongodb .core .mapping .MongoPersistentProperty ;
42+ import org .springframework .data .mongodb .core .mapping .event .AfterSaveCallback ;
43+ import org .springframework .data .mongodb .core .mapping .event .AfterSaveEvent ;
4244import org .springframework .data .mongodb .core .mapping .event .BeforeConvertCallback ;
4345import org .springframework .data .mongodb .core .mapping .event .BeforeConvertEvent ;
4446import org .springframework .data .mongodb .core .mapping .event .BeforeSaveCallback ;
@@ -178,7 +180,7 @@ public NamespaceAwareBulkOperations<T> replaceOne(Query query, Object replacemen
178180 public ClientBulkWriteResult execute () {
179181
180182 // TODO: exceptions need to be translated correctly
181- return operations .doWithClient (new MongoClusterCallback <>() {
183+ ClientBulkWriteResult result = operations .doWithClient (new MongoClusterCallback <>() {
182184
183185 @ Override
184186 public ClientBulkWriteResult doWithClient (MongoCluster cluster ) throws MongoException , DataAccessException {
@@ -190,6 +192,8 @@ public ClientBulkWriteResult doWithClient(MongoCluster cluster) throws MongoExce
190192 }
191193 });
192194
195+ bulkOperationPipeline .postProcess ();
196+ return result ;
193197 }
194198
195199 @ Override
@@ -305,13 +309,13 @@ public NamespacedBulkOperationContext(String database, MongoConverter mongoConve
305309 <T > SourceAwareMappedDocument <T > mapDomainObject (Namespace namespace , T source ) {
306310
307311 publishEvent (new BeforeConvertEvent <>(source , namespace .name ()));
308- Object value = callback (BeforeConvertCallback .class , source , namespace );
312+ T value = callback (BeforeConvertCallback .class , source , namespace );
309313 if (value instanceof Document document ) {
310- return new SourceAwareMappedDocument <>(source , document );
314+ return new SourceAwareMappedDocument <>(value , document );
311315 }
312316 Document sink = new Document ();
313317 mongoConverter .write (value , sink );
314- return new SourceAwareMappedDocument <>(source , sink );
318+ return new SourceAwareMappedDocument <>(value , sink );
315319 }
316320
317321 public Document mapQuery (Namespace namespace , Query query ) {
@@ -321,17 +325,16 @@ public Document mapQuery(Namespace namespace, Query query) {
321325 return queryContext .getMappedQuery (null );
322326 }
323327
324- MongoPersistentEntity <?> persistentEntity = this .mappingContext .getPersistentEntity (namespace .type ());
325- return queryContext .getMappedQuery (persistentEntity );
328+ return queryContext .getMappedQuery (entity (namespace ));
326329 }
327330
328331 public Object mapUpdate (Namespace namespace , Query query , UpdateDefinition updateDefinition , boolean upsert ) {
329332
330333 UpdateContext updateContext = queryOperations .updateContext (updateDefinition , query , upsert );
331- if (updateDefinition instanceof AggregationUpdate aggregationUpdate ) {
334+ if (updateDefinition instanceof AggregationUpdate ) {
332335 return updateContext .getUpdatePipeline (namespace .type ());
333336 }
334- return updateContext .getMappedUpdate (mappingContext . getPersistentEntity (namespace . type () ));
337+ return updateContext .getMappedUpdate (entity (namespace ));
335338 }
336339
337340 public boolean skipEntityCallbacks () {
@@ -374,11 +377,11 @@ public void publishEvent(ApplicationEvent event) {
374377 }
375378
376379 @ Nullable
377- MongoPersistentEntity <?> entity (Class <?> type ) {
378- if (type == null ) {
380+ MongoPersistentEntity <?> entity (Namespace namespace ) {
381+ if (namespace . type () == null ) {
379382 return null ;
380383 }
381- return mappingContext ().getPersistentEntity (type );
384+ return mappingContext ().getPersistentEntity (namespace . type () );
382385 }
383386
384387 MappingContext <? extends MongoPersistentEntity <?>, MongoPersistentProperty > mappingContext () {
@@ -458,6 +461,9 @@ MongoNamespace mongoNamespace() {
458461 return new MongoNamespace (namespace .database (), namespace .collection ());
459462 }
460463
464+ void finish (NamespacedBulkOperationContext context ) {
465+
466+ }
461467 }
462468
463469 static class NamespacedBulkInsert extends NamespacedBulkOperation {
@@ -499,6 +505,12 @@ ClientNamespacedInsertOneModel prepareForWrite(NamespacedBulkOperationContext co
499505 context .callback (BeforeSaveCallback .class , source , mappedObject , namespace );
500506 return ClientNamespacedWriteModel .insertOne (mongoNamespace (), mappedObject );
501507 }
508+
509+ @ Override
510+ void finish (NamespacedBulkOperationContext context ) {
511+ context .publishEvent (new AfterSaveEvent <>(source , mappedObject , namespace .name ()));
512+ context .callback (AfterSaveCallback .class , source , mappedObject , namespace );
513+ }
502514 }
503515
504516 static abstract class BaseNamespacedBulkUpdate extends NamespacedBulkOperation {
@@ -597,7 +609,6 @@ ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext contex
597609
598610 UpdateOptions options = updateOptions (context );
599611 ClientUpdateManyOptions updateOneOptions = ClientUpdateManyOptions .clientUpdateManyOptions ();
600- ;
601612 updateOneOptions .arrayFilters (options .getArrayFilters ());
602613 updateOneOptions .collation (options .getCollation ());
603614 updateOneOptions .upsert (options .isUpsert ());
@@ -699,6 +710,12 @@ ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext contex
699710
700711 return ClientNamespacedWriteModel .replaceOne (mongoNamespace (), mappedQuery , mappedReplacement , replaceOptions );
701712 }
713+
714+ @ Override
715+ void finish (NamespacedBulkOperationContext context ) {
716+ context .publishEvent (new AfterSaveEvent <>(replacement , mappedReplacement , namespace .name ()));
717+ context .callback (AfterSaveCallback .class , replacement , mappedReplacement , namespace );
718+ }
702719 }
703720
704721 static class NamespacedBulkOperationPipeline {
@@ -717,6 +734,10 @@ void append(NamespacedBulkOperation operation) {
717734 List <ClientNamespacedWriteModel > models () {
718735 return pipeline .stream ().map (it -> it .prepareForWrite (bulkOperationContext )).toList ();
719736 }
737+
738+ void postProcess () {
739+ pipeline .forEach (it -> it .finish (bulkOperationContext ));
740+ }
720741 }
721742
722743}
0 commit comments