Skip to content

Commit 46df848

Browse files
feat(firestore): update and delete pipeline DML stages (#14331)
Corresponding Java PR which was used as reference: googleapis/java-firestore#2317 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 53f1991 commit 46df848

4 files changed

Lines changed: 259 additions & 0 deletions

File tree

firestore/pipeline.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,3 +822,94 @@ func (p *Pipeline) RawStage(name string, args []any, opts ...RawStageOptions) *P
822822
}
823823
return p.append(stage)
824824
}
825+
826+
// UpdateOption is an option for an Update pipeline stage.
827+
//
828+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
829+
// regardless of any other documented package stability guarantees.
830+
type UpdateOption interface {
831+
isUpdateOption()
832+
}
833+
834+
type updateTransformationsOption struct {
835+
fields []Selectable
836+
}
837+
838+
func (updateTransformationsOption) isUpdateOption() {}
839+
840+
// WithUpdateTransformations specifies the list of field transformations to apply in an update operation.
841+
//
842+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
843+
// regardless of any other documented package stability guarantees.
844+
func WithUpdateTransformations(field Selectable, additionalFields ...Selectable) UpdateOption {
845+
return updateTransformationsOption{
846+
fields: append([]Selectable{field}, additionalFields...),
847+
}
848+
}
849+
850+
// Update performs an update operation using documents from previous stages.
851+
//
852+
// This method updates the documents in place based on the data flowing through the pipeline.
853+
// You can optionally specify a list of [Selectable] field transformations using [WithUpdateTransformations].
854+
// If no transformations are provided, it performs the update in-place without any changes.
855+
//
856+
// Example:
857+
//
858+
// // In-place update
859+
// client.Pipeline().Literals(updateData).Update()
860+
//
861+
// // Update with transformations
862+
// client.Pipeline().Collection("books").
863+
// Where(GreaterThan("price", 50)).
864+
// Update(WithUpdateTransformations(ConstantOf("Discounted").As("status")))
865+
//
866+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
867+
// regardless of any other documented package stability guarantees.
868+
func (p *Pipeline) Update(opts ...UpdateOption) *Pipeline {
869+
if p.err != nil {
870+
return p
871+
}
872+
873+
var transformations []Selectable
874+
for _, opt := range opts {
875+
if opt != nil {
876+
switch o := opt.(type) {
877+
case updateTransformationsOption:
878+
transformations = append(transformations, o.fields...)
879+
}
880+
}
881+
}
882+
883+
stage, err := newUpdateStage(transformations)
884+
if err != nil {
885+
p.err = err
886+
return p
887+
}
888+
return p.append(stage)
889+
}
890+
891+
// DeleteOption is an option for a Delete pipeline stage.
892+
//
893+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
894+
// regardless of any other documented package stability guarantees.
895+
type DeleteOption interface {
896+
isDeleteOption()
897+
}
898+
899+
// Delete deletes the documents from previous stages.
900+
//
901+
// Example:
902+
//
903+
// client.Pipeline().Collection("logs").
904+
// Where(Equal("status", "archived")).
905+
// Delete()
906+
//
907+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
908+
// regardless of any other documented package stability guarantees.
909+
func (p *Pipeline) Delete(opts ...DeleteOption) *Pipeline {
910+
if p.err != nil {
911+
return p
912+
}
913+
stage := newDeleteStage()
914+
return p.append(stage)
915+
}

firestore/pipeline_integration_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,47 @@ func TestIntegration_PipelineStages(t *testing.T) {
859859
t.Errorf("got %d documents, want 4", len(results))
860860
}
861861
})
862+
t.Run("Update", func(t *testing.T) {
863+
t.Skip("Skipping test until feature is available in PROD")
864+
updateIter := client.Pipeline().Collection(coll.ID).
865+
Where(Equal(FieldOf("author.country"), "UK")).
866+
Update(WithUpdateTransformations(ConstantOf("Active").As("status"))).
867+
Execute(ctx).Results()
868+
defer updateIter.Stop()
869+
_, err := updateIter.GetAll()
870+
if err != nil {
871+
t.Fatalf("Failed to execute update: %v", err)
872+
}
873+
874+
verifyIter := client.Pipeline().Collection(coll.ID).Where(Equal(FieldOf("status"), "Active")).Execute(ctx).Results()
875+
defer verifyIter.Stop()
876+
results, err := verifyIter.GetAll()
877+
if err != nil {
878+
t.Fatalf("Failed to execute verify: %v", err)
879+
}
880+
if len(results) != 4 {
881+
t.Errorf("got %d updated documents, want 4", len(results))
882+
}
883+
})
884+
t.Run("Delete", func(t *testing.T) {
885+
t.Skip("Skipping test until feature is available in PROD")
886+
deleteIter := client.Pipeline().Collection(coll.ID).Where(Equal(FieldOf("title"), "The Great Gatsby")).Delete().Execute(ctx).Results()
887+
defer deleteIter.Stop()
888+
_, err := deleteIter.GetAll()
889+
if err != nil {
890+
t.Fatalf("Failed to execute delete: %v", err)
891+
}
892+
893+
verifyIter := client.Pipeline().Collection(coll.ID).Where(Equal(FieldOf("title"), "The Great Gatsby")).Execute(ctx).Results()
894+
defer verifyIter.Stop()
895+
results, err := verifyIter.GetAll()
896+
if err != nil {
897+
t.Fatalf("Failed to execute verify: %v", err)
898+
}
899+
if len(results) != 0 {
900+
t.Errorf("got %d documents, want 0 after delete", len(results))
901+
}
902+
})
862903
}
863904

864905
func TestIntegration_PipelineFunctions(t *testing.T) {

firestore/pipeline_stage.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
stageNameCollection = "collection"
4343
stageNameCollectionGroup = "collection_group"
4444
stageNameDatabase = "database"
45+
stageNameDelete = "delete"
4546
stageNameDistinct = "distinct"
4647
stageNameDocuments = "documents"
4748
stageNameFindNearest = "find_nearest"
@@ -52,6 +53,7 @@ const (
5253
stageNameSelect = "select"
5354
stageNameUnion = "union"
5455
stageNameUnnest = "unnest"
56+
stageNameUpdate = "update"
5557
stageNameWhere = "where"
5658
)
5759

@@ -598,3 +600,46 @@ func (s *rawStage) toProto() (*pb.Pipeline_Stage, error) {
598600
Options: optionsPb,
599601
}, nil
600602
}
603+
604+
type updateStage struct {
605+
fields []Selectable
606+
}
607+
608+
func newUpdateStage(fields []Selectable) (*updateStage, error) {
609+
return &updateStage{fields: fields}, nil
610+
}
611+
612+
func (s *updateStage) name() string { return stageNameUpdate }
613+
614+
func (s *updateStage) toProto() (*pb.Pipeline_Stage, error) {
615+
var mapVal *pb.Value
616+
if len(s.fields) > 0 {
617+
var err error
618+
mapVal, err = projectionsToMapValue(s.fields)
619+
if err != nil {
620+
return nil, err
621+
}
622+
} else {
623+
mapVal = &pb.Value{ValueType: &pb.Value_MapValue{MapValue: &pb.MapValue{}}}
624+
}
625+
626+
return &pb.Pipeline_Stage{
627+
Name: s.name(),
628+
Args: []*pb.Value{mapVal},
629+
}, nil
630+
}
631+
632+
type deleteStage struct{}
633+
634+
func newDeleteStage() *deleteStage {
635+
return &deleteStage{}
636+
}
637+
638+
func (s *deleteStage) name() string { return stageNameDelete }
639+
640+
func (s *deleteStage) toProto() (*pb.Pipeline_Stage, error) {
641+
return &pb.Pipeline_Stage{
642+
Name: s.name(),
643+
Args: []*pb.Value{},
644+
}, nil
645+
}

firestore/pipeline_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,3 +434,85 @@ func TestPipeline_CreateFromQuery(t *testing.T) {
434434
t.Errorf("toExecutePipelineRequest() mismatch for collection stage (-want +got):\n%s", diff)
435435
}
436436
}
437+
438+
func TestPipeline_Update(t *testing.T) {
439+
client := newTestClient()
440+
ps := &PipelineSource{client: client}
441+
p := ps.Collection("users").Update(WithUpdateTransformations(ConstantOf("Active").As("status")))
442+
443+
req, err := p.toExecutePipelineRequest()
444+
if err != nil {
445+
t.Fatalf("p.toExecutePipelineRequest() failed: %v", err)
446+
}
447+
448+
stages := req.GetStructuredPipeline().GetPipeline().GetStages()
449+
if len(stages) != 2 {
450+
t.Fatalf("Expected 2 stages in proto, got %d", len(stages))
451+
}
452+
453+
wantUpdateStage := &pb.Pipeline_Stage{
454+
Name: "update",
455+
Args: []*pb.Value{
456+
{ValueType: &pb.Value_MapValue{
457+
MapValue: &pb.MapValue{
458+
Fields: map[string]*pb.Value{
459+
"status": {ValueType: &pb.Value_StringValue{StringValue: "Active"}},
460+
},
461+
},
462+
}},
463+
},
464+
}
465+
if diff := cmp.Diff(wantUpdateStage, stages[1], protocmp.Transform()); diff != "" {
466+
t.Errorf("toExecutePipelineRequest() mismatch for update stage (-want +got):\n%s", diff)
467+
}
468+
}
469+
470+
func TestPipeline_Update_Empty(t *testing.T) {
471+
client := newTestClient()
472+
ps := &PipelineSource{client: client}
473+
p := ps.Collection("users").Update()
474+
475+
req, err := p.toExecutePipelineRequest()
476+
if err != nil {
477+
t.Fatalf("p.toExecutePipelineRequest() failed: %v", err)
478+
}
479+
480+
stages := req.GetStructuredPipeline().GetPipeline().GetStages()
481+
if len(stages) != 2 {
482+
t.Fatalf("Expected 2 stages in proto, got %d", len(stages))
483+
}
484+
485+
wantUpdateStage := &pb.Pipeline_Stage{
486+
Name: "update",
487+
Args: []*pb.Value{
488+
{ValueType: &pb.Value_MapValue{MapValue: &pb.MapValue{}}},
489+
},
490+
}
491+
if diff := cmp.Diff(wantUpdateStage, stages[1], protocmp.Transform()); diff != "" {
492+
t.Errorf("toExecutePipelineRequest() mismatch for update stage (empty args) (-want +got):\n%s", diff)
493+
}
494+
}
495+
496+
func TestPipeline_Delete(t *testing.T) {
497+
client := newTestClient()
498+
ps := &PipelineSource{client: client}
499+
p := ps.Collection("users").Delete()
500+
501+
req, err := p.toExecutePipelineRequest()
502+
if err != nil {
503+
t.Fatalf("p.toExecutePipelineRequest() failed: %v", err)
504+
}
505+
506+
stages := req.GetStructuredPipeline().GetPipeline().GetStages()
507+
if len(stages) != 2 {
508+
t.Fatalf("Expected 2 stages in proto, got %d", len(stages))
509+
}
510+
511+
wantDeleteStage := &pb.Pipeline_Stage{
512+
Name: "delete",
513+
Args: []*pb.Value{},
514+
}
515+
if diff := cmp.Diff(wantDeleteStage, stages[1], protocmp.Transform()); diff != "" {
516+
t.Errorf("toExecutePipelineRequest() mismatch for delete stage (-want +got):\n%s", diff)
517+
}
518+
}

0 commit comments

Comments
 (0)