Skip to content

Commit 5979066

Browse files
event handling
1 parent 1367b1f commit 5979066

1 file changed

Lines changed: 95 additions & 82 deletions

File tree

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/NamespacedBulkOperationSupport.java

Lines changed: 95 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,27 @@
2626
import org.springframework.dao.DataAccessException;
2727
import org.springframework.data.mapping.callback.EntityCallback;
2828
import org.springframework.data.mapping.callback.EntityCallbacks;
29+
import org.springframework.data.mapping.context.MappingContext;
2930
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
3031
import org.springframework.data.mongodb.core.NamespaceBulkOperations.NamespaceAwareBulkOperations;
32+
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
33+
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
34+
import org.springframework.data.mongodb.core.aggregation.FieldLookupPolicy;
35+
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
3136
import org.springframework.data.mongodb.core.convert.MongoConverter;
3237
import org.springframework.data.mongodb.core.convert.QueryMapper;
3338
import org.springframework.data.mongodb.core.convert.UpdateMapper;
3439
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
40+
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
3541
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertCallback;
3642
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
43+
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveCallback;
44+
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
3745
import org.springframework.data.mongodb.core.query.Collation;
3846
import org.springframework.data.mongodb.core.query.Query;
3947
import org.springframework.data.mongodb.core.query.Update;
4048
import org.springframework.data.mongodb.core.query.UpdateDefinition;
4149
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
42-
import org.springframework.data.util.Lazy;
4350
import org.springframework.data.util.Pair;
4451
import org.springframework.lang.CheckReturnValue;
4552
import org.springframework.util.Assert;
@@ -51,11 +58,7 @@
5158
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
5259
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
5360
import com.mongodb.client.model.bulk.ClientDeleteManyOptions;
54-
import com.mongodb.client.model.bulk.ClientNamespacedDeleteManyModel;
5561
import com.mongodb.client.model.bulk.ClientNamespacedInsertOneModel;
56-
import com.mongodb.client.model.bulk.ClientNamespacedReplaceOneModel;
57-
import com.mongodb.client.model.bulk.ClientNamespacedUpdateManyModel;
58-
import com.mongodb.client.model.bulk.ClientNamespacedUpdateOneModel;
5962
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
6063
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
6164
import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
@@ -183,7 +186,7 @@ public ClientBulkWriteResult doWithClient(MongoCluster cluster) throws MongoExce
183186
ClientBulkWriteOptions cbws = ClientBulkWriteOptions.clientBulkWriteOptions()
184187
.ordered(NamespacedBulkOperationSupport.this.bulkMode.equals(BulkMode.ORDERED));
185188

186-
return cluster.bulkWrite(bulkOperationPipeline.writeModels(), cbws);
189+
return cluster.bulkWrite(bulkOperationPipeline.models(), cbws);
187190
}
188191
});
189192

@@ -323,7 +326,7 @@ public NamespacedBulkOperation(Namespace namespace) {
323326
@CheckReturnValue
324327
abstract NamespacedBulkOperation map(NamespacedBulkOperationContext context);
325328

326-
abstract ClientNamespacedWriteModel model();
329+
abstract ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext context);
327330

328331
MongoNamespace mongoNamespace() {
329332
return new MongoNamespace(namespace.database(), namespace.collection());
@@ -344,7 +347,6 @@ static class NamespacedBulkInsert extends NamespacedBulkOperation {
344347

345348
private final Object source;
346349
private final @Nullable Document mappedObject;
347-
private final Lazy<ClientNamespacedInsertOneModel> model;
348350

349351
public NamespacedBulkInsert(Namespace namespace, Object source) {
350352
this(namespace, source, null);
@@ -354,9 +356,6 @@ private NamespacedBulkInsert(Namespace namespace, Object source, @Nullable Docum
354356
super(namespace);
355357
this.source = source;
356358
this.mappedObject = mappedObject;
357-
this.model = mappedObject != null
358-
? Lazy.of(() -> ClientNamespacedWriteModel.insertOne(mongoNamespace(), mappedObject))
359-
: Lazy.empty();
360359
}
361360

362361
public Document getMappedObject() {
@@ -383,8 +382,11 @@ NamespacedBulkInsert map(NamespacedBulkOperationContext context) {
383382
}
384383

385384
@Override
386-
ClientNamespacedInsertOneModel model() {
387-
return model.get();
385+
ClientNamespacedInsertOneModel prepareForWrite(NamespacedBulkOperationContext context) {
386+
387+
context.publishEvent(new BeforeSaveEvent<>(source, mappedObject, namespace.name()));
388+
context.callback(BeforeSaveCallback.class, source, mappedObject, namespace);
389+
return ClientNamespacedWriteModel.insertOne(mongoNamespace(), mappedObject);
388390
}
389391
}
390392

@@ -393,11 +395,11 @@ static abstract class BaseNamespacedBulkUpdate extends NamespacedBulkOperation {
393395
protected final Query query;
394396
protected final UpdateDefinition update;
395397
protected final boolean upsert;
396-
protected final @Nullable Document mappedUpdate;
398+
protected final @Nullable Object mappedUpdate;
397399
protected final @Nullable Document mappedQuery;
398400

399401
public BaseNamespacedBulkUpdate(Namespace namespace, Query query, UpdateDefinition source, boolean upsert,
400-
@Nullable Document mappedUpdate, @Nullable Document mappedQuery) {
402+
@Nullable Object mappedUpdate, @Nullable Document mappedQuery) {
401403
super(namespace);
402404
this.query = query;
403405
this.update = source;
@@ -408,14 +410,29 @@ public BaseNamespacedBulkUpdate(Namespace namespace, Query query, UpdateDefiniti
408410

409411
protected static Document mapUpdate(NamespacedBulkOperationContext context, Document updateObject,
410412
Namespace namespace) {
413+
411414
if (namespace.type() == null) {
412415
return context.updateMapper().getMappedObject(updateObject, (MongoPersistentEntity<?>) null);
413416
}
417+
414418
MongoPersistentEntity<?> persistentEntity = context.updateMapper().getMappingContext()
415419
.getPersistentEntity(namespace.type());
416420
return context.updateMapper().getMappedObject(updateObject, persistentEntity);
417421
}
418422

423+
protected static List<Document> mapUpdatePipeline(NamespacedBulkOperationContext context, AggregationUpdate source,
424+
Namespace namespace) {
425+
426+
Class<?> type = namespace.type();
427+
428+
MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext = context.queryMapper()
429+
.getMappingContext();
430+
AggregationOperationContext aggregationContext = new TypeBasedAggregationOperationContext(type, mappingContext,
431+
context.queryMapper(), FieldLookupPolicy.relaxed());
432+
433+
return new AggregationUtil(context.queryMapper(), mappingContext).createPipeline(source, aggregationContext);
434+
}
435+
419436
/**
420437
* @param multi flag to indicate if update might affect multiple documents.
421438
* @return new instance of {@link UpdateOptions}.
@@ -444,84 +461,87 @@ protected UpdateOptions updateOptions(boolean multi) {
444461

445462
static class NamespacedBulkUpdateOne extends BaseNamespacedBulkUpdate {
446463

447-
Lazy<ClientNamespacedUpdateOneModel> model;
448-
449464
public NamespacedBulkUpdateOne(Namespace namespace, Query query, UpdateDefinition source, boolean upsert) {
450465
this(namespace, query, source, upsert, null, null);
451466
}
452467

453468
public NamespacedBulkUpdateOne(Namespace namespace, Query query, UpdateDefinition source, boolean upsert,
454-
@Nullable Document mappedUpdate, @Nullable Document mappedQuery) {
469+
@Nullable Object mappedUpdate, @Nullable Document mappedQuery) {
455470
super(namespace, query, source, upsert, mappedUpdate, mappedQuery);
456-
457-
model = mappedUpdate != null && mappedUpdate != null ? Lazy.of(() -> {
458-
459-
UpdateOptions options = updateOptions(false);
460-
ClientUpdateOneOptions updateOneOptions = new ConcreteClientUpdateOneOptions();
461-
updateOneOptions.arrayFilters(options.getArrayFilters());
462-
updateOneOptions.collation(options.getCollation());
463-
updateOneOptions.upsert(options.isUpsert());
464-
updateOneOptions.hint(options.getHint());
465-
updateOneOptions.hintString(options.getHintString());
466-
467-
return ClientNamespacedWriteModel.updateOne(mongoNamespace(), mappedQuery, mappedUpdate, updateOneOptions);
468-
}) : Lazy.empty();
469471
}
470472

471473
@Override
472474
NamespacedBulkUpdateOne map(NamespacedBulkOperationContext context) {
473475

474-
Document mappedUpdate = mapUpdate(context, update.getUpdateObject(), namespace);
476+
Object mappedUpdate = update instanceof AggregationUpdate aggregationUpdate
477+
? mapUpdatePipeline(context, aggregationUpdate, namespace)
478+
: mapUpdate(context, update.getUpdateObject(), namespace);
475479
Document mappedQuery = mapQuery(context, query.getQueryObject(), namespace);
476480

477481
return new NamespacedBulkUpdateOne(namespace, query, update, upsert, mappedUpdate, mappedQuery);
478482
}
479483

480484
@Override
481-
ClientNamespacedWriteModel model() {
482-
return model.get();
485+
@SuppressWarnings("unchecked")
486+
ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext context) {
487+
UpdateOptions options = updateOptions(false);
488+
ClientUpdateOneOptions updateOneOptions = new ConcreteClientUpdateOneOptions();
489+
updateOneOptions.arrayFilters(options.getArrayFilters());
490+
updateOneOptions.collation(options.getCollation());
491+
updateOneOptions.upsert(options.isUpsert());
492+
updateOneOptions.hint(options.getHint());
493+
updateOneOptions.hintString(options.getHintString());
494+
495+
if (mappedUpdate instanceof List<?> pipeline) {
496+
return ClientNamespacedWriteModel.updateOne(mongoNamespace(), mappedQuery, (List<Document>) pipeline,
497+
updateOneOptions);
498+
}
499+
return ClientNamespacedWriteModel.updateOne(mongoNamespace(), mappedQuery, (Document) mappedUpdate,
500+
updateOneOptions);
483501
}
484502

485503
}
486504

487505
static class NamespacedBulkUpdateMany extends BaseNamespacedBulkUpdate {
488506

489-
Lazy<ClientNamespacedUpdateManyModel> model;
490-
491507
public NamespacedBulkUpdateMany(Namespace namespace, Query query, UpdateDefinition source, boolean upsert) {
492508
this(namespace, query, source, upsert, null, null);
493509
}
494510

495511
public NamespacedBulkUpdateMany(Namespace namespace, Query query, UpdateDefinition source, boolean upsert,
496-
@Nullable Document mappedUpdate, @Nullable Document mappedQuery) {
512+
@Nullable Object mappedUpdate, @Nullable Document mappedQuery) {
497513
super(namespace, query, source, upsert, mappedUpdate, mappedQuery);
498-
499-
model = mappedUpdate != null && mappedUpdate != null ? Lazy.of(() -> {
500-
501-
UpdateOptions options = updateOptions(true);
502-
ConcreteClientUpdateManyOptions updateOneOptions = new ConcreteClientUpdateManyOptions();
503-
updateOneOptions.arrayFilters(options.getArrayFilters());
504-
updateOneOptions.collation(options.getCollation());
505-
updateOneOptions.upsert(options.isUpsert());
506-
updateOneOptions.hint(options.getHint());
507-
updateOneOptions.hintString(options.getHintString());
508-
509-
return ClientNamespacedWriteModel.updateMany(mongoNamespace(), mappedQuery, mappedUpdate, updateOneOptions);
510-
}) : Lazy.empty();
511514
}
512515

513516
@Override
514517
NamespacedBulkUpdateMany map(NamespacedBulkOperationContext context) {
515518

516-
Document mappedUpdate = mapUpdate(context, update.getUpdateObject(), namespace);
519+
Object mappedUpdate = update instanceof AggregationUpdate aggregationUpdate
520+
? mapUpdatePipeline(context, aggregationUpdate, namespace)
521+
: mapUpdate(context, update.getUpdateObject(), namespace);
517522
Document mappedQuery = mapQuery(context, query.getQueryObject(), namespace);
518523

519524
return new NamespacedBulkUpdateMany(namespace, query, update, upsert, mappedUpdate, mappedQuery);
520525
}
521526

522527
@Override
523-
ClientNamespacedWriteModel model() {
524-
return model.get();
528+
@SuppressWarnings("unchecked")
529+
ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext context) {
530+
531+
UpdateOptions options = updateOptions(true);
532+
ConcreteClientUpdateManyOptions updateOneOptions = new ConcreteClientUpdateManyOptions();
533+
updateOneOptions.arrayFilters(options.getArrayFilters());
534+
updateOneOptions.collation(options.getCollation());
535+
updateOneOptions.upsert(options.isUpsert());
536+
updateOneOptions.hint(options.getHint());
537+
updateOneOptions.hintString(options.getHintString());
538+
539+
if (mappedUpdate instanceof List<?> pipeline) {
540+
return ClientNamespacedWriteModel.updateMany(mongoNamespace(), mappedQuery, (List<Document>) pipeline,
541+
updateOneOptions);
542+
}
543+
return ClientNamespacedWriteModel.updateMany(mongoNamespace(), mappedQuery, (Document) mappedUpdate,
544+
updateOneOptions);
525545
}
526546

527547
}
@@ -530,7 +550,6 @@ static class NamespacedBulkRemove extends NamespacedBulkOperation {
530550

531551
private final Query query;
532552
private final @Nullable Document mappedQuery;
533-
private final Lazy<ClientNamespacedDeleteManyModel> model;
534553

535554
public NamespacedBulkRemove(Namespace namespace, Query query) {
536555
this(namespace, query, null);
@@ -540,13 +559,6 @@ public NamespacedBulkRemove(Namespace namespace, Query query, @Nullable Document
540559
super(namespace);
541560
this.query = query;
542561
this.mappedQuery = mappedQuery;
543-
this.model = mappedQuery != null ? Lazy.of(() -> {
544-
545-
ClientDeleteManyOptions deleteOptions = ClientDeleteManyOptions.clientDeleteManyOptions();
546-
query.getCollation().map(Collation::toMongoCollation).ifPresent(deleteOptions::collation);
547-
548-
return ClientNamespacedWriteModel.deleteMany(mongoNamespace(), mappedQuery, deleteOptions);
549-
}) : Lazy.empty();
550562
}
551563

552564
@Override
@@ -555,8 +567,12 @@ NamespacedBulkRemove map(NamespacedBulkOperationContext context) {
555567
}
556568

557569
@Override
558-
ClientNamespacedWriteModel model() {
559-
return model.get();
570+
ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext context) {
571+
572+
ClientDeleteManyOptions deleteOptions = ClientDeleteManyOptions.clientDeleteManyOptions();
573+
query.getCollation().map(Collation::toMongoCollation).ifPresent(deleteOptions::collation);
574+
575+
return ClientNamespacedWriteModel.deleteMany(mongoNamespace(), mappedQuery, deleteOptions);
560576
}
561577
}
562578

@@ -569,8 +585,6 @@ static class NamespacedBulkReplace extends NamespacedBulkOperation {
569585
private final @Nullable Document mappedQuery;
570586
private final @Nullable Document mappedReplacement;
571587

572-
Lazy<ClientNamespacedReplaceOneModel> model;
573-
574588
public NamespacedBulkReplace(Namespace namespace, Query query, Object replacement, FindAndReplaceOptions options) {
575589
this(namespace, query, replacement, options, null, null);
576590
}
@@ -583,17 +597,6 @@ public NamespacedBulkReplace(Namespace namespace, Query query, Object replacemen
583597
this.options = options;
584598
this.mappedQuery = mappedQuery;
585599
this.mappedReplacement = mappedReplacement;
586-
587-
model = mappedQuery != null ? Lazy.of(() -> {
588-
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
589-
replaceOptions.upsert(options.isUpsert());
590-
if (query.isSorted()) {
591-
replaceOptions.sort(query.getSortObject());
592-
}
593-
query.getCollation().map(Collation::toMongoCollation).ifPresent(replaceOptions::collation);
594-
595-
return ClientNamespacedWriteModel.replaceOne(mongoNamespace(), mappedQuery, mappedReplacement, replaceOptions);
596-
}) : Lazy.empty();
597600
}
598601

599602
@Override
@@ -613,8 +616,19 @@ NamespacedBulkOperation map(NamespacedBulkOperationContext context) {
613616
}
614617

615618
@Override
616-
ClientNamespacedWriteModel model() {
617-
return model.get();
619+
ClientNamespacedWriteModel prepareForWrite(NamespacedBulkOperationContext context) {
620+
621+
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
622+
replaceOptions.upsert(options.isUpsert());
623+
if (query.isSorted()) {
624+
replaceOptions.sort(query.getSortObject());
625+
}
626+
query.getCollation().map(Collation::toMongoCollation).ifPresent(replaceOptions::collation);
627+
628+
context.publishEvent(new BeforeSaveEvent<>(replacement, mappedReplacement, namespace.name()));
629+
context.callback(BeforeSaveCallback.class, replacement, mappedReplacement, namespace);
630+
631+
return ClientNamespacedWriteModel.replaceOne(mongoNamespace(), mappedQuery, mappedReplacement, replaceOptions);
618632
}
619633
}
620634

@@ -628,13 +642,12 @@ public NamespacedBulkOperationPipeline(NamespacedBulkOperationContext bulkOperat
628642
}
629643

630644
void append(NamespacedBulkOperation operation) {
631-
pipeline.add(operation.map(bulkOperationContext)); // TODO: map here or later on execute?
645+
pipeline.add(operation.map(bulkOperationContext));
632646
}
633647

634-
List<ClientNamespacedWriteModel> writeModels() {
635-
return pipeline.stream().map(NamespacedBulkOperation::model).toList();
648+
List<ClientNamespacedWriteModel> models() {
649+
return pipeline.stream().map(it -> it.prepareForWrite(bulkOperationContext)).toList();
636650
}
637-
638651
}
639652

640653
}

0 commit comments

Comments
 (0)