From 0e40fa838eaf2b735bdb00910ddcc41d4a26cac6 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Mon, 6 Apr 2026 20:59:44 +0000 Subject: [PATCH 1/2] feat(firestore): pipeline subqueries --- firestore/pipeline.go | 143 +++++++++++++++++++-- firestore/pipeline_expression.go | 9 +- firestore/pipeline_function.go | 48 ++++++- firestore/pipeline_function_test.go | 38 ++++++ firestore/pipeline_integration_test.go | 167 ++++++++++++++++++++++++- firestore/pipeline_source.go | 23 ++++ firestore/pipeline_stage.go | 46 +++++++ firestore/pipeline_stage_test.go | 2 +- firestore/pipeline_utils.go | 8 ++ 9 files changed, 469 insertions(+), 15 deletions(-) diff --git a/firestore/pipeline.go b/firestore/pipeline.go index bb776d93a572..b3eeaa2a72f5 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -182,6 +182,8 @@ func (RawOptions) isDocumentsOption() {} func (RawOptions) isLiteralsOption() {} +func (RawOptions) isDefineOption() {} + func (r RawOptions) apply(eo *executeSettings) { if eo.RawOptions == nil { eo.RawOptions = make(map[string]any) @@ -227,6 +229,15 @@ func Selectables(s ...Selectable) []Selectable { return []Selectable(s) } +// AliasedExpressions is a helper function that returns its arguments as a slice of *AliasedExpression. +// It is used to provide variadic-like ergonomics for the [Pipeline.Define] pipeline stage. +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func AliasedExpressions(v ...*AliasedExpression) []*AliasedExpression { + return v +} + // Execute executes the pipeline and returns a snapshot of the results. // // Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, @@ -242,6 +253,15 @@ func (p *Pipeline) Execute(ctx context.Context, opts ...ExecuteOption) *Pipeline } } + if newP.c == nil { + newP.err = fmt.Errorf("This pipeline was created without a database (e.g., as a subcollection pipeline) and cannot be executed directly. It can only be used as part of another pipeline.") + return &PipelineSnapshot{ + iter: &PipelineResultIterator{ + err: newP.err, + }, + } + } + ctx = withResourceHeader(ctx, newP.c.path()) ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(newP.c.path())) @@ -877,6 +897,10 @@ func (p *Pipeline) Union(other *Pipeline, opts ...UnionOption) *Pipeline { if p.err != nil { return p } + if other.c == nil { + p.err = fmt.Errorf("Union only supports combining root pipelines, doesn't support relative scope Pipeline like relative subcollection pipeline") + return p + } options := make(map[string]any) for _, opt := range opts { if opt != nil { @@ -913,19 +937,19 @@ type Sampler struct { Mode SampleMode } -// ByDocuments creates a Sampler for sampling a fixed number of documents. +// WithDocLimit creates a Sampler for sampling a fixed number of documents. // // Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, // regardless of any other documented package stability guarantees. -func ByDocuments(limit int) *Sampler { +func WithDocLimit(limit int) *Sampler { return &Sampler{Size: limit, Mode: SampleModeDocuments} } -// ByPercentage creates a Sampler for sampling a percentage of documents. +// WithPercentage creates a Sampler for sampling a percentage of documents. // // Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, // regardless of any other documented package stability guarantees. -func ByPercentage(percentage float64) *Sampler { +func WithPercentage(percentage float64) *Sampler { return &Sampler{Size: percentage, Mode: SampleModePercent} } @@ -941,15 +965,15 @@ type SampleOption interface { // Sample performs a pseudo-random sampling of the documents from the previous stage. // // This stage will filter documents pseudo-randomly. The behavior is defined by the Sampler. -// Use ByDocuments or ByPercentage to create a Sampler. +// Use WithDocLimit or WithPercentage to create a Sampler. // // Example: // // // Sample 10 books, if available. -// client.Pipeline().Collection("books").Sample(ByDocuments(10)) +// client.Pipeline().Collection("books").Sample(WithDocLimit(10)) // // // Sample 50% of books. -// client.Pipeline().Collection("books").Sample(ByPercentage(0.5)) +// client.Pipeline().Collection("books").Sample(WithPercentage(0.5)) // // Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, // regardless of any other documented package stability guarantees. @@ -1242,3 +1266,108 @@ func (p *Pipeline) Delete(opts ...DeleteOption) *Pipeline { stage := newDeleteStage(options) return p.append(stage) } + +// Scalar converts this Pipeline into an expression that evaluates to a single scalar result. +// Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. +// +// Example: +// +// // Calculate average rating for each restaurant using a subquery +// client.Pipeline().Collection("restaurants"). +// AddFields(Selectables( +// Subcollection("reviews"). +// Aggregate(Accumulators(Average("rating").As("avg_score"))). +// Scalar().As("stats"), +// )) +// // Output format: +// // [ +// // { +// // "name": "The Burger Joint", +// // "stats": { +// // "avg_score": 4.8, +// // "review_count": 120 +// // } +// // } +// // ] +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func (p *Pipeline) ToScalarExpression() Expression { + return newBaseFunction("scalar", []Expression{newPipelineValueExpression(p)}) +} + +// ToArrayExpression converts this Pipeline into an expression that evaluates to an array result. +// +// Example: +// +// // Embed a subcollection of reviews as an array into each restaurant document +// client.Pipeline().Collection("restaurants"). +// AddFields(Selectables( +// Subcollection("reviews"). +// Select(Fields("reviewer", "rating")). +// ToArrayExpression().As("reviews"), +// )) +// // Output format: +// // [ +// // { +// // "name": "The Burger Joint", +// // "reviews": [ +// // { "reviewer": "Alice", "rating": 5 }, +// // { "reviewer": "Bob", "rating": 4 } +// // ] +// // } +// // ] +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func (p *Pipeline) ToArrayExpression() Expression { + return newBaseFunction("array", []Expression{newPipelineValueExpression(p)}) +} + +// DefineOption is an option for a Define pipeline stage. +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +type DefineOption interface { + StageOption + isDefineOption() +} + +// Define adds a "let" stage to the pipeline to define variables. +// +// Defines one or more variables in the pipeline's scope. `Define` is used to bind a value to a +// name, which can be referenced in subsequent stages using [Variable]. +// +// Each variable is defined using an [AliasedExpression], which pairs an expression with +// its alias (the variable name). +// +// Example: +// +// // Define a variable and use it in a filter +// client.Pipeline().Collection("products"). +// Define(AliasedExpressions( +// Multiply("price", 0.9).As("discountedPrice"), +// Add("stock", 10).As("newStock"), +// )). +// Where(LessThan(Variable("discountedPrice"), 100)). +// Select(Fields("name", Variable("newStock"))) +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func (p *Pipeline) Define(variables []*AliasedExpression, opts ...DefineOption) *Pipeline { + if p.err != nil { + return p + } + options := make(map[string]any) + for _, opt := range opts { + if opt != nil { + opt.applyStage(options) + } + } + stage, err := newDefineStage(variables, options) + if err != nil { + p.err = err + return p + } + return p.append(stage) +} diff --git a/firestore/pipeline_expression.go b/firestore/pipeline_expression.go index 9f581dcbc06a..a862e58a37c7 100644 --- a/firestore/pipeline_expression.go +++ b/firestore/pipeline_expression.go @@ -316,6 +316,10 @@ type Expression interface { GetCollectionID() Expression // GetDocumentID creates an expression that returns the ID of the document. GetDocumentID() Expression + // GetField creates an expression that accesses a field/property of a document field using the provided key. + // + // The parameter 'key' can be a string constant or an [Expression] that evaluates to a string. + GetField(key any) Expression // Logical functions // IfError creates an expression that evaluates and returns the receiver expression if it does not produce an error; @@ -506,7 +510,7 @@ type Expression interface { // As assigns an alias to an expression. // Aliases are useful for renaming fields in the output of a stage. - As(alias string) Selectable + As(alias string) *AliasedExpression } // baseExpression provides common methods for all Expr implementations, allowing for method chaining. @@ -645,6 +649,7 @@ func (b *baseExpression) Concat(others ...any) Expression { return Concat(b, oth // Key functions func (b *baseExpression) GetCollectionID() Expression { return GetCollectionID(b) } func (b *baseExpression) GetDocumentID() Expression { return GetDocumentID(b) } +func (b *baseExpression) GetField(key any) Expression { return GetField(b, key) } // Logical functions func (b *baseExpression) IfError(catchExprOrValue any) Expression { @@ -755,7 +760,7 @@ func (b *baseExpression) VectorLength() Expression { return Vector func (b *baseExpression) Ascending() Ordering { return Ascending(b) } func (b *baseExpression) Descending() Ordering { return Descending(b) } -func (b *baseExpression) As(alias string) Selectable { +func (b *baseExpression) As(alias string) *AliasedExpression { return newAliasedExpr(b, alias) } diff --git a/firestore/pipeline_function.go b/firestore/pipeline_function.go index a4f84a40c290..f455abb87be5 100644 --- a/firestore/pipeline_function.go +++ b/firestore/pipeline_function.go @@ -390,7 +390,18 @@ func CurrentTimestamp() Expression { return newBaseFunction("current_timestamp", []Expression{}) } -// CurrentDocument creates an expression that returns the current document. +// CurrentDocument creates an expression that represents the current document being processed. +// +// This expression is useful when you need to access the entire document as a map, or pass the +// document itself to a function or subquery. +// +// Example: +// +// // Define the current document as a variable "doc" +// client.Pipeline().Collection("books"). +// Define(AliasedExpressions(CurrentDocument().As("doc"))). +// // Access a field from the defined document variable +// Select(Fields(GetField(Variable("doc"), "title"))) // // Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, // regardless of any other documented package stability guarantees. @@ -398,6 +409,31 @@ func CurrentDocument() Expression { return newBaseFunction("current_document", []Expression{}) } +// Variable creates an expression that retrieves the value of a variable bound via Define. +// +// Example: +// +// // Define a variable "discountedPrice" and use it in a filter +// client.Pipeline().Collection("products"). +// Define(AliasedExpressions(Multiply("price", 0.9).As("discountedPrice"))). +// Where(LessThan(Variable("discountedPrice"), 100)) +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func Variable(name string) Expression { + pbVal := &pb.Value{ValueType: &pb.Value_VariableReferenceValue{VariableReferenceValue: name}} + return &baseExpression{pbVal: pbVal} +} + +// Scalar converts a Pipeline into an expression that evaluates to a single scalar result. +// Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func Scalar(pipeline *Pipeline) Expression { + return newBaseFunction("scalar", []Expression{newPipelineValueExpression(pipeline)}) +} + // ArrayLength creates an expression that calculates the length of an array. // - exprOrFieldPath can be a field path string, [FieldPath] or an [Expression] that evaluates to an array. // @@ -912,6 +948,16 @@ func GetDocumentID(exprStringOrDocRef any) Expression { return newBaseFunction("document_id", []Expression{expr}) } +// GetField creates an expression that accesses a field/property of a document field using the provided key. +// - exprOrField: The expression representing the document or map. +// - key: The key of the field to access. +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func GetField(exprOrField any, key any) Expression { + return newBaseFunction("get_field", []Expression{asFieldExpr(exprOrField), asStringExpr(key)}) +} + // Conditional creates an expression that evaluates a condition and returns one of two expressions. // - condition is the boolean expression to evaluate. // - thenVal is the expression to return if the condition is true. diff --git a/firestore/pipeline_function_test.go b/firestore/pipeline_function_test.go index e00fc52efe85..900ff557a9e0 100644 --- a/firestore/pipeline_function_test.go +++ b/firestore/pipeline_function_test.go @@ -1080,3 +1080,41 @@ func TestArrayFunctions(t *testing.T) { }) } } + +func TestGetFieldVariations(t *testing.T) { + // 1. (e Expression) GetField(string) + expr1 := FieldOf("doc").GetField("title") + if expr1 == nil { + t.Fatal("expected expr1 not to be nil") + } + + // 2. (e Expression) GetField(Expression) + expr2 := FieldOf("doc").GetField(ConstantOf("title")) + if expr2 == nil { + t.Fatal("expected expr2 not to be nil") + } + + // 3. GetField(string, string) + expr3 := GetField("doc", "title") + if expr3 == nil { + t.Fatal("expected expr3 not to be nil") + } + + // 4. GetField(Expression, Expression) + expr4 := GetField(Variable("doc"), ConstantOf("title")) + if expr4 == nil { + t.Fatal("expected expr4 not to be nil") + } + + // 5. GetField(string, Expression) + expr5 := GetField("doc", ConstantOf("title")) + if expr5 == nil { + t.Fatal("expected expr5 not to be nil") + } + + // 6. GetField(Expression, string) + expr6 := GetField(Variable("doc"), "title") + if expr6 == nil { + t.Fatal("expected expr6 not to be nil") + } +} diff --git a/firestore/pipeline_integration_test.go b/firestore/pipeline_integration_test.go index 92a4b81f84c7..139d2a0b84b1 100644 --- a/firestore/pipeline_integration_test.go +++ b/firestore/pipeline_integration_test.go @@ -593,8 +593,8 @@ func TestIntegration_PipelineStages(t *testing.T) { } }) t.Run("Sample", func(t *testing.T) { - t.Run("SampleByDocuments", func(t *testing.T) { - iter := client.Pipeline().Collection(coll.ID).Sample(ByDocuments(5)).Execute(ctx).Results() + t.Run("SampleWithDocLimit", func(t *testing.T) { + iter := client.Pipeline().Collection(coll.ID).Sample(WithDocLimit(5)).Execute(ctx).Results() defer iter.Stop() var got []map[string]interface{} for { @@ -615,8 +615,8 @@ func TestIntegration_PipelineStages(t *testing.T) { t.Errorf("got %d documents, want 5", len(got)) } }) - t.Run("SampleByPercentage", func(t *testing.T) { - iter := client.Pipeline().Collection(coll.ID).Sample(ByPercentage(0.6)).Execute(ctx).Results() + t.Run("SampleWithPercentage", func(t *testing.T) { + iter := client.Pipeline().Collection(coll.ID).Sample(WithPercentage(0.6)).Execute(ctx).Results() defer iter.Stop() var got []map[string]interface{} for { @@ -2966,3 +2966,162 @@ func logicalFuncs(t *testing.T) { }) } } + +func TestIntegration_PipelineSubqueriesAndVariables(t *testing.T) { + skipIfNotEnterprise(t) + ctx := context.Background() + client := integrationClient(t) + coll := integrationColl(t) + + // Create test documents + restaurantsRef := coll.NewDoc() + _, err := restaurantsRef.Create(ctx, map[string]interface{}{ + "name": "The Burger Joint", + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + restaurantsRef.Delete(ctx) + }) + + review1Ref := restaurantsRef.Collection("reviews").NewDoc() + _, err = review1Ref.Create(ctx, map[string]interface{}{ + "reviewer": "Alice", + "rating": 5, + }) + if err != nil { + t.Fatal(err) + } + + review2Ref := restaurantsRef.Collection("reviews").NewDoc() + _, err = review2Ref.Create(ctx, map[string]interface{}{ + "reviewer": "Bob", + "rating": 4, + }) + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + review1Ref.Delete(ctx) + review2Ref.Delete(ctx) + }) + + productsRef := coll.NewDoc() + _, err = productsRef.Create(ctx, map[string]interface{}{ + "name": "Widget", + "price": 100, + "stock": 20, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + productsRef.Delete(ctx) + }) + + t.Run("SubcollectionAndScalar", func(t *testing.T) { + iter := client.Pipeline().Documents([]*DocumentRef{restaurantsRef}). + AddFields(Selectables( + Subcollection("reviews"). + Aggregate(Accumulators(Average("rating").As("avg_score"))). + ToScalarExpression().As("stats"), + )).Execute(ctx).Results() + + res, err := iter.GetAll() + if err != nil { + t.Fatal(err) + } + if len(res) != 1 { + t.Fatalf("expected 1 doc, got %d", len(res)) + } + + data := res[0].Data() + stats, ok := data["stats"].(float64) + if !ok { + t.Fatalf("expected stats to be float64, got %T", data["stats"]) + } + if stats != 4.5 { + t.Errorf("expected stats 4.5, got %v", stats) + } + }) + + t.Run("SubcollectionAndArray", func(t *testing.T) { + iter := client.Pipeline().Documents([]*DocumentRef{restaurantsRef}). + AddFields(Selectables( + Subcollection("reviews"). + Select(Fields("reviewer", "rating")). + Sort(Orders(Ascending(FieldOf("reviewer")))). + ToArrayExpression().As("reviews"), + )).Execute(ctx).Results() + + res, err := iter.GetAll() + if err != nil { + t.Fatal(err) + } + if len(res) != 1 { + t.Fatalf("expected 1 doc, got %d", len(res)) + } + + data := res[0].Data() + reviews, ok := data["reviews"].([]interface{}) + if !ok { + t.Fatalf("expected reviews to be array, got %T", data["reviews"]) + } + if len(reviews) != 2 { + t.Fatalf("expected 2 reviews, got %d", len(reviews)) + } + r1 := reviews[0].(map[string]interface{}) + if r1["reviewer"] != "Alice" || r1["rating"].(int64) != 5 { + t.Errorf("expected Alice with rating 5, got %v", r1) + } + }) + + t.Run("DefineAndVariable", func(t *testing.T) { + iter := client.Pipeline().Documents([]*DocumentRef{productsRef}). + Define(AliasedExpressions( + Multiply("price", 0.9).As("discountedPrice"), + Add("stock", 10).As("newStock"), + )). + Where(LessThan(Variable("discountedPrice"), 100)). + Select(Fields("name", Variable("newStock").As("newStock"))). + Execute(ctx).Results() + + res, err := iter.GetAll() + if err != nil { + t.Fatal(err) + } + if len(res) != 1 { + t.Fatalf("expected 1 doc, got %d", len(res)) + } + + data := res[0].Data() + if data["name"] != "Widget" { + t.Errorf("expected name Widget, got %v", data["name"]) + } + if data["newStock"].(int64) != 30 { + t.Errorf("expected newStock 30, got %v", data["newStock"]) + } + }) + + t.Run("CurrentDocument", func(t *testing.T) { + iter := client.Pipeline().Documents([]*DocumentRef{productsRef}). + Define(AliasedExpressions(CurrentDocument().As("doc"))). + Select(Fields(MapGet(Variable("doc"), "name").As("name"))). + Execute(ctx).Results() + + res, err := iter.GetAll() + if err != nil { + t.Fatal(err) + } + if len(res) != 1 { + t.Fatalf("expected 1 doc, got %d", len(res)) + } + + data := res[0].Data() + if data["name"] != "Widget" { + t.Errorf("expected name Widget, got %v", data["name"]) + } + }) +} diff --git a/firestore/pipeline_source.go b/firestore/pipeline_source.go index f549ffed902f..b696b7d51415 100644 --- a/firestore/pipeline_source.go +++ b/firestore/pipeline_source.go @@ -211,3 +211,26 @@ func (ps *PipelineSource) Literals(documents []map[string]any, opts ...LiteralsO } return newPipeline(ps.client, newInputStageLiterals(documents, options)) } + +// Subcollection creates a new [Pipeline] that operates on a subcollection of the current document. +// +// This method allows you to start a new pipeline that operates on a subcollection of the +// current document. It is intended to be used as a subquery. +// +// Note: A pipeline created with `Subcollection` cannot be executed directly using +// [Pipeline.Execute]. It must be used within a parent pipeline. +// +// Example: +// +// client.Pipeline().Collection("books"). +// AddFields(Selectables( +// Subcollection("reviews"). +// Aggregate(Accumulators(Average("rating").As("avg_rating"))). +// ToScalarExpression().As("average_rating"), +// )) +// +// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, +// regardless of any other documented package stability guarantees. +func Subcollection(path string) *Pipeline { + return newPipeline(nil, newInputStageSubcollection(path)) +} diff --git a/firestore/pipeline_stage.go b/firestore/pipeline_stage.go index 84a81e6bb1a3..60da57da87d8 100644 --- a/firestore/pipeline_stage.go +++ b/firestore/pipeline_stage.go @@ -48,6 +48,7 @@ const ( stageNameUnnest = "unnest" stageNameUpdate = "update" stageNameWhere = "where" + stageNameDefine = "let" ) // internal interface for pipeline stages. @@ -168,6 +169,22 @@ func (s *inputStageDocuments) toProto() (*pb.Pipeline_Stage, error) { }, nil } +// inputStageSubcollection returns a pipeline starting from a subcollection of the current document. +type inputStageSubcollection struct { + path string +} + +func newInputStageSubcollection(path string) *inputStageSubcollection { + return &inputStageSubcollection{path: path} +} +func (s *inputStageSubcollection) name() string { return "subcollection" } +func (s *inputStageSubcollection) toProto() (*pb.Pipeline_Stage, error) { + return &pb.Pipeline_Stage{ + Name: s.name(), + Args: []*pb.Value{{ValueType: &pb.Value_StringValue{StringValue: s.path}}}, + }, nil +} + // inputStageLiterals returns a fixed set of documents. type inputStageLiterals struct { documents []map[string]any @@ -198,6 +215,35 @@ func (s *inputStageLiterals) toProto() (*pb.Pipeline_Stage, error) { }, nil } +type defineStage struct { + variables []*AliasedExpression + options map[string]any +} + +func newDefineStage(variables []*AliasedExpression, options map[string]any) (*defineStage, error) { + return &defineStage{variables: variables, options: options}, nil +} +func (s *defineStage) name() string { return stageNameDefine } +func (s *defineStage) toProto() (*pb.Pipeline_Stage, error) { + selectables := make([]Selectable, len(s.variables)) + for i, v := range s.variables { + selectables[i] = v + } + mapVal, err := projectionsToMapValue(selectables) + if err != nil { + return nil, err + } + optionsPb, err := stageOptionsToProto(s.options) + if err != nil { + return nil, err + } + return &pb.Pipeline_Stage{ + Name: s.name(), + Args: []*pb.Value{mapVal}, + Options: optionsPb, + }, nil +} + // addFieldsStage is the internal representation of an AddFields stage. type addFieldsStage struct { fields []Selectable diff --git a/firestore/pipeline_stage_test.go b/firestore/pipeline_stage_test.go index 4ef534fddd03..931c6383c1a5 100644 --- a/firestore/pipeline_stage_test.go +++ b/firestore/pipeline_stage_test.go @@ -344,7 +344,7 @@ func TestReplaceStage(t *testing.T) { } func TestSampleStage(t *testing.T) { - spec := ByDocuments(100) + spec := WithDocLimit(100) stage, err := newSampleStage(spec, nil) if err != nil { t.Fatalf("newSampleStage() failed: %v", err) diff --git a/firestore/pipeline_utils.go b/firestore/pipeline_utils.go index f6e0a0748013..c158ed1b4e88 100644 --- a/firestore/pipeline_utils.go +++ b/firestore/pipeline_utils.go @@ -118,6 +118,14 @@ func toExprOrConstant(val any) Expression { return ConstantOf(val) } +func newPipelineValueExpression(p *Pipeline) Expression { + pbVal, err := p.toProto() + if err != nil { + return &baseExpression{err: err} + } + return &baseExpression{pbVal: &pb.Value{ValueType: &pb.Value_PipelineValue{PipelineValue: pbVal}}} +} + // asFieldExpr converts a plain Go string or FieldPath into a field expression. // If the value is already an Expr, it's returned directly. func asFieldExpr(val any) Expression { From 24ffae5e0b48495dc85eb9c58c1c76d429bd2d79 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Mon, 6 Apr 2026 21:47:07 +0000 Subject: [PATCH 2/2] fix docs --- firestore/pipeline.go | 18 ++++++++++-------- firestore/pipeline_function.go | 9 --------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/firestore/pipeline.go b/firestore/pipeline.go index b3eeaa2a72f5..61d373df15b3 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -254,7 +254,7 @@ func (p *Pipeline) Execute(ctx context.Context, opts ...ExecuteOption) *Pipeline } if newP.c == nil { - newP.err = fmt.Errorf("This pipeline was created without a database (e.g., as a subcollection pipeline) and cannot be executed directly. It can only be used as part of another pipeline.") + newP.err = fmt.Errorf("pipeline created without a database (e.g., as a subcollection pipeline) cannot be executed directly; it can only be used as part of another pipeline") return &PipelineSnapshot{ iter: &PipelineResultIterator{ err: newP.err, @@ -898,7 +898,7 @@ func (p *Pipeline) Union(other *Pipeline, opts ...UnionOption) *Pipeline { return p } if other.c == nil { - p.err = fmt.Errorf("Union only supports combining root pipelines, doesn't support relative scope Pipeline like relative subcollection pipeline") + p.err = fmt.Errorf("union only supports combining root pipelines; relative scope pipelines (like subcollection pipelines) are not supported") return p } options := make(map[string]any) @@ -1267,7 +1267,7 @@ func (p *Pipeline) Delete(opts ...DeleteOption) *Pipeline { return p.append(stage) } -// Scalar converts this Pipeline into an expression that evaluates to a single scalar result. +// ToScalarExpression converts this Pipeline into an expression that evaluates to a single scalar result. // Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. // // Example: @@ -1277,7 +1277,7 @@ func (p *Pipeline) Delete(opts ...DeleteOption) *Pipeline { // AddFields(Selectables( // Subcollection("reviews"). // Aggregate(Accumulators(Average("rating").As("avg_score"))). -// Scalar().As("stats"), +// ToScalarExpression().As("stats"), // )) // // Output format: // // [ @@ -1333,13 +1333,15 @@ type DefineOption interface { isDefineOption() } -// Define adds a "let" stage to the pipeline to define variables. +// Define defines one or more variables in the pipeline's scope. `Define` is used to bind a value to a +// variable for internal reuse within the pipeline body (accessed via the [Variable] function). // -// Defines one or more variables in the pipeline's scope. `Define` is used to bind a value to a -// name, which can be referenced in subsequent stages using [Variable]. +// This stage is useful for declaring reusable values or intermediate calculations that can be +// referenced multiple times in later parts of the pipeline, improving readability and +// maintainability. // // Each variable is defined using an [AliasedExpression], which pairs an expression with -// its alias (the variable name). +// a name (alias). // // Example: // diff --git a/firestore/pipeline_function.go b/firestore/pipeline_function.go index f455abb87be5..1aa441951349 100644 --- a/firestore/pipeline_function.go +++ b/firestore/pipeline_function.go @@ -425,15 +425,6 @@ func Variable(name string) Expression { return &baseExpression{pbVal: pbVal} } -// Scalar converts a Pipeline into an expression that evaluates to a single scalar result. -// Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. -// -// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions, -// regardless of any other documented package stability guarantees. -func Scalar(pipeline *Pipeline) Expression { - return newBaseFunction("scalar", []Expression{newPipelineValueExpression(pipeline)}) -} - // ArrayLength creates an expression that calculates the length of an array. // - exprOrFieldPath can be a field path string, [FieldPath] or an [Expression] that evaluates to an array. //