Skip to content

Commit c40e42f

Browse files
authored
feat(firestore): pipeline subqueries (#14365)
**Overview** This PR introduces support for pipeline subqueries, variable definitions, and joins in the Go Firestore SDK, achieving strict 1:1 feature parity with the Java SDK's pipeline APIs googleapis/java-firestore#2323. **What are Subqueries?** In Firestore pipelines, subqueries allow you to embed an entire pipeline execution as a value within a single stage of an outer pipeline. This is incredibly powerful for performing complex "join-like" operations across different collections. For example, while querying a restaurants collection, you can use a subquery to fetch, filter, and aggregate all documents from a nested reviews subcollection, and embed that aggregated result (e.g., average_rating) directly into the restaurant document being returned. Subqueries can be evaluated into either an array of results (ToArrayExpression()) or a single scalar value (ToScalarExpression()). **Key Features & API Additions:** * **Subqueries (Joins):** * Implemented the Subcollection(path string) package-level function to instantiate relative-scope pipelines. * Added ToScalarExpression() and ToArrayExpression() methods on Pipeline to explicitly convert subqueries into expressions, allowing them to be seamlessly embedded inside stages like AddFields and Where. * **Variable Definition & References:** * Introduced the Define pipeline stage and the AliasedExpressions variadic helper to ergonomically bind values to variables. * Added Variable("name") and CurrentDocument() top-level functions to reference bound values in subsequent pipeline stages. * **Field Access & Overloading:** * Implemented GetField (accepting any to support both string and Expression arguments, mirroring Java's method overloading). **Type Safety & Defensive Constraints:** * Strict Aliasing: Expression.As() now explicitly returns *AliasedExpression (similar to Java: https://github.com/googleapis/java-firestore/blob/0c8188520dfbada0d3fef0719e4f95fc231306be/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Expression.java#L6608-L6621) rather than a generic Selectable interface, and the Define stage strictly requires []*AliasedExpression. * Pipeline Scope Validation: Added validation to ensure relative-scope pipelines (e.g., those created via Subcollection) cannot be executed directly or passed into a Union() stage. Attempts to do so now return descriptive errors identical to the Java SDK's IllegalStateException and IllegalArgumentException.
1 parent 9886dcf commit c40e42f

9 files changed

Lines changed: 462 additions & 15 deletions

firestore/pipeline.go

Lines changed: 138 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ func (RawOptions) isDocumentsOption() {}
182182

183183
func (RawOptions) isLiteralsOption() {}
184184

185+
func (RawOptions) isDefineOption() {}
186+
185187
func (r RawOptions) apply(eo *executeSettings) {
186188
if eo.RawOptions == nil {
187189
eo.RawOptions = make(map[string]any)
@@ -227,6 +229,15 @@ func Selectables(s ...Selectable) []Selectable {
227229
return []Selectable(s)
228230
}
229231

232+
// AliasedExpressions is a helper function that returns its arguments as a slice of *AliasedExpression.
233+
// It is used to provide variadic-like ergonomics for the [Pipeline.Define] pipeline stage.
234+
//
235+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
236+
// regardless of any other documented package stability guarantees.
237+
func AliasedExpressions(v ...*AliasedExpression) []*AliasedExpression {
238+
return v
239+
}
240+
230241
// Execute executes the pipeline and returns a snapshot of the results.
231242
//
232243
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
@@ -242,6 +253,15 @@ func (p *Pipeline) Execute(ctx context.Context, opts ...ExecuteOption) *Pipeline
242253
}
243254
}
244255

256+
if newP.c == nil {
257+
newP.err = fmt.Errorf("pipeline created without a database (e.g., as a subcollection pipeline) cannot be executed directly; it can only be used as part of another pipeline")
258+
return &PipelineSnapshot{
259+
iter: &PipelineResultIterator{
260+
err: newP.err,
261+
},
262+
}
263+
}
264+
245265
ctx = withResourceHeader(ctx, newP.c.path())
246266
ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(newP.c.path()))
247267

@@ -877,6 +897,10 @@ func (p *Pipeline) Union(other *Pipeline, opts ...UnionOption) *Pipeline {
877897
if p.err != nil {
878898
return p
879899
}
900+
if other.c == nil {
901+
p.err = fmt.Errorf("union only supports combining root pipelines; relative scope pipelines (like subcollection pipelines) are not supported")
902+
return p
903+
}
880904
options := make(map[string]any)
881905
for _, opt := range opts {
882906
if opt != nil {
@@ -913,19 +937,19 @@ type Sampler struct {
913937
Mode SampleMode
914938
}
915939

916-
// ByDocuments creates a Sampler for sampling a fixed number of documents.
940+
// WithDocLimit creates a Sampler for sampling a fixed number of documents.
917941
//
918942
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
919943
// regardless of any other documented package stability guarantees.
920-
func ByDocuments(limit int) *Sampler {
944+
func WithDocLimit(limit int) *Sampler {
921945
return &Sampler{Size: limit, Mode: SampleModeDocuments}
922946
}
923947

924-
// ByPercentage creates a Sampler for sampling a percentage of documents.
948+
// WithPercentage creates a Sampler for sampling a percentage of documents.
925949
//
926950
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
927951
// regardless of any other documented package stability guarantees.
928-
func ByPercentage(percentage float64) *Sampler {
952+
func WithPercentage(percentage float64) *Sampler {
929953
return &Sampler{Size: percentage, Mode: SampleModePercent}
930954
}
931955

@@ -941,15 +965,15 @@ type SampleOption interface {
941965
// Sample performs a pseudo-random sampling of the documents from the previous stage.
942966
//
943967
// This stage will filter documents pseudo-randomly. The behavior is defined by the Sampler.
944-
// Use ByDocuments or ByPercentage to create a Sampler.
968+
// Use WithDocLimit or WithPercentage to create a Sampler.
945969
//
946970
// Example:
947971
//
948972
// // Sample 10 books, if available.
949-
// client.Pipeline().Collection("books").Sample(ByDocuments(10))
973+
// client.Pipeline().Collection("books").Sample(WithDocLimit(10))
950974
//
951975
// // Sample 50% of books.
952-
// client.Pipeline().Collection("books").Sample(ByPercentage(0.5))
976+
// client.Pipeline().Collection("books").Sample(WithPercentage(0.5))
953977
//
954978
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
955979
// regardless of any other documented package stability guarantees.
@@ -1242,3 +1266,110 @@ func (p *Pipeline) Delete(opts ...DeleteOption) *Pipeline {
12421266
stage := newDeleteStage(options)
12431267
return p.append(stage)
12441268
}
1269+
1270+
// ToScalarExpression converts this Pipeline into an expression that evaluates to a single scalar result.
1271+
// Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object.
1272+
//
1273+
// Example:
1274+
//
1275+
// // Calculate average rating for each restaurant using a subquery
1276+
// client.Pipeline().Collection("restaurants").
1277+
// AddFields(Selectables(
1278+
// Subcollection("reviews").
1279+
// Aggregate(Accumulators(Average("rating").As("avg_score"))).
1280+
// ToScalarExpression().As("stats"),
1281+
// ))
1282+
// // Output format:
1283+
// // [
1284+
// // {
1285+
// // "name": "The Burger Joint",
1286+
// // "stats": {
1287+
// // "avg_score": 4.8,
1288+
// // "review_count": 120
1289+
// // }
1290+
// // }
1291+
// // ]
1292+
//
1293+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1294+
// regardless of any other documented package stability guarantees.
1295+
func (p *Pipeline) ToScalarExpression() Expression {
1296+
return newBaseFunction("scalar", []Expression{newPipelineValueExpression(p)})
1297+
}
1298+
1299+
// ToArrayExpression converts this Pipeline into an expression that evaluates to an array result.
1300+
//
1301+
// Example:
1302+
//
1303+
// // Embed a subcollection of reviews as an array into each restaurant document
1304+
// client.Pipeline().Collection("restaurants").
1305+
// AddFields(Selectables(
1306+
// Subcollection("reviews").
1307+
// Select(Fields("reviewer", "rating")).
1308+
// ToArrayExpression().As("reviews"),
1309+
// ))
1310+
// // Output format:
1311+
// // [
1312+
// // {
1313+
// // "name": "The Burger Joint",
1314+
// // "reviews": [
1315+
// // { "reviewer": "Alice", "rating": 5 },
1316+
// // { "reviewer": "Bob", "rating": 4 }
1317+
// // ]
1318+
// // }
1319+
// // ]
1320+
//
1321+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1322+
// regardless of any other documented package stability guarantees.
1323+
func (p *Pipeline) ToArrayExpression() Expression {
1324+
return newBaseFunction("array", []Expression{newPipelineValueExpression(p)})
1325+
}
1326+
1327+
// DefineOption is an option for a Define pipeline stage.
1328+
//
1329+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1330+
// regardless of any other documented package stability guarantees.
1331+
type DefineOption interface {
1332+
StageOption
1333+
isDefineOption()
1334+
}
1335+
1336+
// Define defines one or more variables in the pipeline's scope. `Define` is used to bind a value to a
1337+
// variable for internal reuse within the pipeline body (accessed via the [Variable] function).
1338+
//
1339+
// This stage is useful for declaring reusable values or intermediate calculations that can be
1340+
// referenced multiple times in later parts of the pipeline, improving readability and
1341+
// maintainability.
1342+
//
1343+
// Each variable is defined using an [AliasedExpression], which pairs an expression with
1344+
// a name (alias).
1345+
//
1346+
// Example:
1347+
//
1348+
// // Define a variable and use it in a filter
1349+
// client.Pipeline().Collection("products").
1350+
// Define(AliasedExpressions(
1351+
// Multiply("price", 0.9).As("discountedPrice"),
1352+
// Add("stock", 10).As("newStock"),
1353+
// )).
1354+
// Where(LessThan(Variable("discountedPrice"), 100)).
1355+
// Select(Fields("name", Variable("newStock")))
1356+
//
1357+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1358+
// regardless of any other documented package stability guarantees.
1359+
func (p *Pipeline) Define(variables []*AliasedExpression, opts ...DefineOption) *Pipeline {
1360+
if p.err != nil {
1361+
return p
1362+
}
1363+
options := make(map[string]any)
1364+
for _, opt := range opts {
1365+
if opt != nil {
1366+
opt.applyStage(options)
1367+
}
1368+
}
1369+
stage, err := newDefineStage(variables, options)
1370+
if err != nil {
1371+
p.err = err
1372+
return p
1373+
}
1374+
return p.append(stage)
1375+
}

firestore/pipeline_expression.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ type Expression interface {
316316
GetCollectionID() Expression
317317
// GetDocumentID creates an expression that returns the ID of the document.
318318
GetDocumentID() Expression
319+
// GetField creates an expression that accesses a field/property of a document field using the provided key.
320+
//
321+
// The parameter 'key' can be a string constant or an [Expression] that evaluates to a string.
322+
GetField(key any) Expression
319323

320324
// Logical functions
321325
// IfError creates an expression that evaluates and returns the receiver expression if it does not produce an error;
@@ -506,7 +510,7 @@ type Expression interface {
506510

507511
// As assigns an alias to an expression.
508512
// Aliases are useful for renaming fields in the output of a stage.
509-
As(alias string) Selectable
513+
As(alias string) *AliasedExpression
510514
}
511515

512516
// baseExpression provides common methods for all Expr implementations, allowing for method chaining.
@@ -645,6 +649,7 @@ func (b *baseExpression) Concat(others ...any) Expression { return Concat(b, oth
645649
// Key functions
646650
func (b *baseExpression) GetCollectionID() Expression { return GetCollectionID(b) }
647651
func (b *baseExpression) GetDocumentID() Expression { return GetDocumentID(b) }
652+
func (b *baseExpression) GetField(key any) Expression { return GetField(b, key) }
648653

649654
// Logical functions
650655
func (b *baseExpression) IfError(catchExprOrValue any) Expression {
@@ -755,7 +760,7 @@ func (b *baseExpression) VectorLength() Expression { return Vector
755760
func (b *baseExpression) Ascending() Ordering { return Ascending(b) }
756761
func (b *baseExpression) Descending() Ordering { return Descending(b) }
757762

758-
func (b *baseExpression) As(alias string) Selectable {
763+
func (b *baseExpression) As(alias string) *AliasedExpression {
759764
return newAliasedExpr(b, alias)
760765
}
761766

firestore/pipeline_function.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,14 +390,41 @@ func CurrentTimestamp() Expression {
390390
return newBaseFunction("current_timestamp", []Expression{})
391391
}
392392

393-
// CurrentDocument creates an expression that returns the current document.
393+
// CurrentDocument creates an expression that represents the current document being processed.
394+
//
395+
// This expression is useful when you need to access the entire document as a map, or pass the
396+
// document itself to a function or subquery.
397+
//
398+
// Example:
399+
//
400+
// // Define the current document as a variable "doc"
401+
// client.Pipeline().Collection("books").
402+
// Define(AliasedExpressions(CurrentDocument().As("doc"))).
403+
// // Access a field from the defined document variable
404+
// Select(Fields(GetField(Variable("doc"), "title")))
394405
//
395406
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
396407
// regardless of any other documented package stability guarantees.
397408
func CurrentDocument() Expression {
398409
return newBaseFunction("current_document", []Expression{})
399410
}
400411

412+
// Variable creates an expression that retrieves the value of a variable bound via Define.
413+
//
414+
// Example:
415+
//
416+
// // Define a variable "discountedPrice" and use it in a filter
417+
// client.Pipeline().Collection("products").
418+
// Define(AliasedExpressions(Multiply("price", 0.9).As("discountedPrice"))).
419+
// Where(LessThan(Variable("discountedPrice"), 100))
420+
//
421+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
422+
// regardless of any other documented package stability guarantees.
423+
func Variable(name string) Expression {
424+
pbVal := &pb.Value{ValueType: &pb.Value_VariableReferenceValue{VariableReferenceValue: name}}
425+
return &baseExpression{pbVal: pbVal}
426+
}
427+
401428
// ArrayLength creates an expression that calculates the length of an array.
402429
// - exprOrFieldPath can be a field path string, [FieldPath] or an [Expression] that evaluates to an array.
403430
//
@@ -912,6 +939,16 @@ func GetDocumentID(exprStringOrDocRef any) Expression {
912939
return newBaseFunction("document_id", []Expression{expr})
913940
}
914941

942+
// GetField creates an expression that accesses a field/property of a document field using the provided key.
943+
// - exprOrField: The expression representing the document or map.
944+
// - key: The key of the field to access.
945+
//
946+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
947+
// regardless of any other documented package stability guarantees.
948+
func GetField(exprOrField any, key any) Expression {
949+
return newBaseFunction("get_field", []Expression{asFieldExpr(exprOrField), asStringExpr(key)})
950+
}
951+
915952
// Conditional creates an expression that evaluates a condition and returns one of two expressions.
916953
// - condition is the boolean expression to evaluate.
917954
// - thenVal is the expression to return if the condition is true.

firestore/pipeline_function_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,3 +1080,41 @@ func TestArrayFunctions(t *testing.T) {
10801080
})
10811081
}
10821082
}
1083+
1084+
func TestGetFieldVariations(t *testing.T) {
1085+
// 1. (e Expression) GetField(string)
1086+
expr1 := FieldOf("doc").GetField("title")
1087+
if expr1 == nil {
1088+
t.Fatal("expected expr1 not to be nil")
1089+
}
1090+
1091+
// 2. (e Expression) GetField(Expression)
1092+
expr2 := FieldOf("doc").GetField(ConstantOf("title"))
1093+
if expr2 == nil {
1094+
t.Fatal("expected expr2 not to be nil")
1095+
}
1096+
1097+
// 3. GetField(string, string)
1098+
expr3 := GetField("doc", "title")
1099+
if expr3 == nil {
1100+
t.Fatal("expected expr3 not to be nil")
1101+
}
1102+
1103+
// 4. GetField(Expression, Expression)
1104+
expr4 := GetField(Variable("doc"), ConstantOf("title"))
1105+
if expr4 == nil {
1106+
t.Fatal("expected expr4 not to be nil")
1107+
}
1108+
1109+
// 5. GetField(string, Expression)
1110+
expr5 := GetField("doc", ConstantOf("title"))
1111+
if expr5 == nil {
1112+
t.Fatal("expected expr5 not to be nil")
1113+
}
1114+
1115+
// 6. GetField(Expression, string)
1116+
expr6 := GetField(Variable("doc"), "title")
1117+
if expr6 == nil {
1118+
t.Fatal("expected expr6 not to be nil")
1119+
}
1120+
}

0 commit comments

Comments
 (0)