4949import org .springframework .data .mongodb .core .convert .MongoConverter ;
5050import org .springframework .data .mongodb .core .convert .QueryMapper ;
5151import org .springframework .data .mongodb .core .convert .UpdateMapper ;
52+ import org .springframework .data .mongodb .core .mapping .MongoPersistentEntity ;
5253import org .springframework .data .mongodb .core .mapping .event .BeforeConvertCallback ;
5354import org .springframework .data .mongodb .core .mapping .event .BeforeConvertEvent ;
5455import org .springframework .data .mongodb .core .query .Collation ;
7576import com .mongodb .internal .client .model .bulk .ConcreteClientUpdateOneOptions ;
7677
7778/**
79+ * NOT THREAD SAFE!!!
7880 * @author Christoph Strobl
7981 */
8082class NamespacedBulkOperationSupport <T > implements NamespaceAwareBulkOperations <T > {
@@ -90,7 +92,7 @@ public NamespacedBulkOperationSupport(BulkMode mode, NamespacedBulkOperationCont
9092
9193 this .bulkMode = mode ;
9294 this .ctx = ctx ;
93- this .currentNamespace = new Namespace (ctx .database (), null );
95+ this .currentNamespace = new Namespace (ctx .database (), null , null );
9496 this .operations = operations ;
9597 }
9698
@@ -165,11 +167,12 @@ public NamespaceAwareBulkOperations<T> upsert(List<Pair<Query, Update>> updates)
165167 @ Override
166168 public NamespaceAwareBulkOperations <T > remove (Query query ) {
167169
170+ Namespace namespace = currentNamespace ;
168171 ClientDeleteManyOptions deleteOptions = ClientDeleteManyOptions .clientDeleteManyOptions ();
169172 query .getCollation ().map (Collation ::toMongoCollation ).ifPresent (deleteOptions ::collation );
170173
171- addModel (query ,
172- ClientNamespacedWriteModel . deleteMany ( mongoNamespace ( currentNamespace ), query .getQueryObject (), deleteOptions ));
174+ addModel (query , ClientNamespacedWriteModel . deleteMany ( mongoNamespace ( namespace ),
175+ getMappedQuery ( query .getQueryObject (), namespace ), deleteOptions ));
173176
174177 return this ;
175178 }
@@ -196,11 +199,11 @@ public NamespaceAwareBulkOperations<T> replaceOne(Query query, Object replacemen
196199 }
197200 query .getCollation ().map (Collation ::toMongoCollation ).ifPresent (replaceOptions ::collation );
198201
199- Namespace ns = currentNamespace ;
200- maybeEmitEvent (new BeforeConvertEvent <>(replacement , ns .name ()));
201- Object source = maybeInvokeBeforeConvertCallback (replacement , ns );
202- addModel (source , ClientNamespacedWriteModel .replaceOne (mongoNamespace (ns ), query . getQueryObject ( ),
203- getMappedObject (source ), replaceOptions ));
202+ Namespace namespace = currentNamespace ;
203+ maybeEmitEvent (new BeforeConvertEvent <>(replacement , namespace .name ()));
204+ Object source = maybeInvokeBeforeConvertCallback (replacement , namespace );
205+ addModel (source , ClientNamespacedWriteModel .replaceOne (mongoNamespace (namespace ),
206+ getMappedQuery ( query . getQueryObject (), namespace ), getMappedObject (source ), replaceOptions ));
204207
205208 return this ;
206209 }
@@ -242,23 +245,31 @@ public NamespaceAwareBulkOperations<Object> inCollection(String collection,
242245 }
243246
244247 @ Override
245- @ SuppressWarnings ("unchecked" )
246248 public <S > NamespaceAwareBulkOperations <S > inCollection (Class <S > type ) {
247- return inCollection (operations .getCollectionName (type ));
249+ return inCollection (operations .getCollectionName (type ), type );
248250 }
249251
250252 @ Override
251- @ SuppressWarnings ({ "rawtypes" , "unchecked" })
252- public NamespaceAwareBulkOperations inCollection (String collection ) {
253+ @ SuppressWarnings ("unchecked" )
254+ public NamespaceAwareBulkOperations <Object > inCollection (String collection ) {
255+ return changeCollection (collection , null );
256+ }
253257
254- this .currentNamespace = new Namespace (currentNamespace .database (), collection );
258+ @ SuppressWarnings ("unchecked" )
259+ public <S > NamespaceAwareBulkOperations <S > inCollection (String collection , @ Nullable Class <S > type ) {
260+ return changeCollection (collection , type );
261+ }
262+
263+ @ SuppressWarnings ({ "rawtypes" })
264+ private NamespaceAwareBulkOperations changeCollection (String collection , @ Nullable Class <?> type ) {
265+ this .currentNamespace = new Namespace (currentNamespace .database (), collection , type );
255266 return this ;
256267 }
257268
258269 @ Override
259270 public NamespaceBulkOperations switchDatabase (String databaseName ) {
260271
261- this .currentNamespace = new Namespace (databaseName , null );
272+ this .currentNamespace = new Namespace (databaseName , null , null );
262273 return this ;
263274 }
264275
@@ -319,8 +330,10 @@ private void update(Namespace namespace, Query query, UpdateDefinition update, b
319330 mayOptions .hint (options .getHint ());
320331 mayOptions .hintString (options .getHintString ());
321332
322- addModel (update , ClientNamespacedWriteModel .updateMany (mongoNamespace (namespace ), query .getQueryObject (),
323- update .getUpdateObject (), mayOptions ));
333+ addModel (update ,
334+ ClientNamespacedWriteModel .updateMany (mongoNamespace (namespace ),
335+ getMappedQuery (query .getQueryObject (), namespace ), getMappedUpdate (update .getUpdateObject (), namespace ),
336+ mayOptions ));
324337 } else {
325338
326339 ClientUpdateOneOptions mayOptions = new ConcreteClientUpdateOneOptions ();
@@ -329,9 +342,31 @@ private void update(Namespace namespace, Query query, UpdateDefinition update, b
329342 mayOptions .upsert (options .isUpsert ());
330343 mayOptions .hint (options .getHint ());
331344 mayOptions .hintString (options .getHintString ());
332- addModel (update , ClientNamespacedWriteModel .updateOne (mongoNamespace (namespace ), query .getQueryObject (),
333- update .getUpdateObject (), mayOptions ));
345+ addModel (update ,
346+ ClientNamespacedWriteModel .updateOne (mongoNamespace (namespace ),
347+ getMappedQuery (query .getQueryObject (), namespace ), getMappedUpdate (update .getUpdateObject (), namespace ),
348+ mayOptions ));
349+ }
350+ }
351+
352+ private Document getMappedUpdate (Document updateObject , Namespace namespace ) {
353+ if (namespace .type () == null ) {
354+ return ctx .updateMapper ().getMappedObject (updateObject , (MongoPersistentEntity <?>) null );
334355 }
356+ MongoPersistentEntity <?> persistentEntity = ctx .updateMapper ().getMappingContext ()
357+ .getPersistentEntity (namespace .type ());
358+ return ctx .updateMapper ().getMappedObject (updateObject , persistentEntity );
359+ }
360+
361+ private Document getMappedQuery (Document query , Namespace namespace ) {
362+
363+ if (namespace .type () == null ) {
364+ return ctx .queryMapper ().getMappedObject (query , (MongoPersistentEntity <?>) null );
365+ }
366+
367+ MongoPersistentEntity <?> persistentEntity = ctx .updateMapper ().getMappingContext ()
368+ .getPersistentEntity (namespace .type ());
369+ return ctx .queryMapper ().getMappedObject (query , persistentEntity );
335370 }
336371
337372 private Document getMappedObject (Object source ) {
@@ -357,7 +392,7 @@ MongoNamespace mongoNamespace(Namespace namespace) {
357392 return namespaces .computeIfAbsent (namespace , key -> new MongoNamespace (key .database (), key .collection ()));
358393 }
359394
360- record Namespace (String database , @ Nullable String collection ) {
395+ record Namespace (String database , @ Nullable String collection , @ Nullable Class <?> type ) {
361396
362397 public String name () {
363398 return String .format ("%s.%s" , database , collection != null ? collection : "n/a" );
0 commit comments