Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 138 additions & 7 deletions firestore/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func (RawOptions) isDocumentsOption() {}

func (RawOptions) isLiteralsOption() {}

func (RawOptions) isDefineOption() {}

func (r RawOptions) apply(eo *executeSettings) {
if eo.RawOptions == nil {
eo.RawOptions = make(map[string]any)
Expand Down Expand Up @@ -227,6 +229,15 @@ func Selectables(s ...Selectable) []Selectable {
return []Selectable(s)
}

// AliasedExpressions is a helper function that returns its arguments as a slice of *AliasedExpression.
// It is used to provide variadic-like ergonomics for the [Pipeline.Define] pipeline stage.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func AliasedExpressions(v ...*AliasedExpression) []*AliasedExpression {
return v
}

// Execute executes the pipeline and returns a snapshot of the results.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
Expand All @@ -242,6 +253,15 @@ func (p *Pipeline) Execute(ctx context.Context, opts ...ExecuteOption) *Pipeline
}
}

if newP.c == nil {
newP.err = fmt.Errorf("pipeline created without a database (e.g., as a subcollection pipeline) cannot be executed directly; it can only be used as part of another pipeline")
return &PipelineSnapshot{
iter: &PipelineResultIterator{
err: newP.err,
},
}
}

ctx = withResourceHeader(ctx, newP.c.path())
ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(newP.c.path()))

Expand Down Expand Up @@ -877,6 +897,10 @@ func (p *Pipeline) Union(other *Pipeline, opts ...UnionOption) *Pipeline {
if p.err != nil {
return p
}
if other.c == nil {
p.err = fmt.Errorf("union only supports combining root pipelines; relative scope pipelines (like subcollection pipelines) are not supported")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One ergonomic factor to consider for the future: would users benefit from having these kind of errors exported for easier checking via errors.Is or similar?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in #14429

return p
}
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
Expand Down Expand Up @@ -913,19 +937,19 @@ type Sampler struct {
Mode SampleMode
}

// ByDocuments creates a Sampler for sampling a fixed number of documents.
// WithDocLimit creates a Sampler for sampling a fixed number of documents.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func ByDocuments(limit int) *Sampler {
func WithDocLimit(limit int) *Sampler {
return &Sampler{Size: limit, Mode: SampleModeDocuments}
}

// ByPercentage creates a Sampler for sampling a percentage of documents.
// WithPercentage creates a Sampler for sampling a percentage of documents.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func ByPercentage(percentage float64) *Sampler {
func WithPercentage(percentage float64) *Sampler {
return &Sampler{Size: percentage, Mode: SampleModePercent}
}

Expand All @@ -941,15 +965,15 @@ type SampleOption interface {
// Sample performs a pseudo-random sampling of the documents from the previous stage.
//
// This stage will filter documents pseudo-randomly. The behavior is defined by the Sampler.
// Use ByDocuments or ByPercentage to create a Sampler.
// Use WithDocLimit or WithPercentage to create a Sampler.
//
// Example:
//
// // Sample 10 books, if available.
// client.Pipeline().Collection("books").Sample(ByDocuments(10))
// client.Pipeline().Collection("books").Sample(WithDocLimit(10))
//
// // Sample 50% of books.
// client.Pipeline().Collection("books").Sample(ByPercentage(0.5))
// client.Pipeline().Collection("books").Sample(WithPercentage(0.5))
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
Expand Down Expand Up @@ -1242,3 +1266,110 @@ func (p *Pipeline) Delete(opts ...DeleteOption) *Pipeline {
stage := newDeleteStage(options)
return p.append(stage)
}

// ToScalarExpression converts this Pipeline into an expression that evaluates to a single scalar result.
// Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object.
//
// Example:
//
// // Calculate average rating for each restaurant using a subquery
// client.Pipeline().Collection("restaurants").
// AddFields(Selectables(
// Subcollection("reviews").
// Aggregate(Accumulators(Average("rating").As("avg_score"))).
// ToScalarExpression().As("stats"),
// ))
// // Output format:
// // [
// // {
// // "name": "The Burger Joint",
// // "stats": {
// // "avg_score": 4.8,
// // "review_count": 120
// // }
// // }
// // ]
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (p *Pipeline) ToScalarExpression() Expression {
return newBaseFunction("scalar", []Expression{newPipelineValueExpression(p)})
}

// ToArrayExpression converts this Pipeline into an expression that evaluates to an array result.
//
// Example:
//
// // Embed a subcollection of reviews as an array into each restaurant document
// client.Pipeline().Collection("restaurants").
// AddFields(Selectables(
// Subcollection("reviews").
// Select(Fields("reviewer", "rating")).
// ToArrayExpression().As("reviews"),
// ))
// // Output format:
// // [
// // {
// // "name": "The Burger Joint",
// // "reviews": [
// // { "reviewer": "Alice", "rating": 5 },
// // { "reviewer": "Bob", "rating": 4 }
// // ]
// // }
// // ]
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (p *Pipeline) ToArrayExpression() Expression {
return newBaseFunction("array", []Expression{newPipelineValueExpression(p)})
}

// DefineOption is an option for a Define pipeline stage.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
type DefineOption interface {
StageOption
isDefineOption()
}

// Define defines one or more variables in the pipeline's scope. `Define` is used to bind a value to a
// variable for internal reuse within the pipeline body (accessed via the [Variable] function).
//
// This stage is useful for declaring reusable values or intermediate calculations that can be
// referenced multiple times in later parts of the pipeline, improving readability and
// maintainability.
//
// Each variable is defined using an [AliasedExpression], which pairs an expression with
// a name (alias).
//
// Example:
//
// // Define a variable and use it in a filter
// client.Pipeline().Collection("products").
// Define(AliasedExpressions(
// Multiply("price", 0.9).As("discountedPrice"),
// Add("stock", 10).As("newStock"),
// )).
// Where(LessThan(Variable("discountedPrice"), 100)).
// Select(Fields("name", Variable("newStock")))
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func (p *Pipeline) Define(variables []*AliasedExpression, opts ...DefineOption) *Pipeline {
if p.err != nil {
return p
}
options := make(map[string]any)
for _, opt := range opts {
if opt != nil {
opt.applyStage(options)
}
}
stage, err := newDefineStage(variables, options)
if err != nil {
p.err = err
return p
}
return p.append(stage)
}
9 changes: 7 additions & 2 deletions firestore/pipeline_expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ type Expression interface {
GetCollectionID() Expression
// GetDocumentID creates an expression that returns the ID of the document.
GetDocumentID() Expression
// GetField creates an expression that accesses a field/property of a document field using the provided key.
//
// The parameter 'key' can be a string constant or an [Expression] that evaluates to a string.
GetField(key any) Expression

// Logical functions
// IfError creates an expression that evaluates and returns the receiver expression if it does not produce an error;
Expand Down Expand Up @@ -506,7 +510,7 @@ type Expression interface {

// As assigns an alias to an expression.
// Aliases are useful for renaming fields in the output of a stage.
As(alias string) Selectable
As(alias string) *AliasedExpression
}

// baseExpression provides common methods for all Expr implementations, allowing for method chaining.
Expand Down Expand Up @@ -645,6 +649,7 @@ func (b *baseExpression) Concat(others ...any) Expression { return Concat(b, oth
// Key functions
func (b *baseExpression) GetCollectionID() Expression { return GetCollectionID(b) }
func (b *baseExpression) GetDocumentID() Expression { return GetDocumentID(b) }
func (b *baseExpression) GetField(key any) Expression { return GetField(b, key) }

// Logical functions
func (b *baseExpression) IfError(catchExprOrValue any) Expression {
Expand Down Expand Up @@ -755,7 +760,7 @@ func (b *baseExpression) VectorLength() Expression { return Vector
func (b *baseExpression) Ascending() Ordering { return Ascending(b) }
func (b *baseExpression) Descending() Ordering { return Descending(b) }

func (b *baseExpression) As(alias string) Selectable {
func (b *baseExpression) As(alias string) *AliasedExpression {
return newAliasedExpr(b, alias)
}

Expand Down
39 changes: 38 additions & 1 deletion firestore/pipeline_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,41 @@ func CurrentTimestamp() Expression {
return newBaseFunction("current_timestamp", []Expression{})
}

// CurrentDocument creates an expression that returns the current document.
// CurrentDocument creates an expression that represents the current document being processed.
//
// This expression is useful when you need to access the entire document as a map, or pass the
// document itself to a function or subquery.
//
// Example:
//
// // Define the current document as a variable "doc"
// client.Pipeline().Collection("books").
// Define(AliasedExpressions(CurrentDocument().As("doc"))).
// // Access a field from the defined document variable
// Select(Fields(GetField(Variable("doc"), "title")))
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func CurrentDocument() Expression {
return newBaseFunction("current_document", []Expression{})
}

// Variable creates an expression that retrieves the value of a variable bound via Define.
//
// Example:
//
// // Define a variable "discountedPrice" and use it in a filter
// client.Pipeline().Collection("products").
// Define(AliasedExpressions(Multiply("price", 0.9).As("discountedPrice"))).
// Where(LessThan(Variable("discountedPrice"), 100))
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func Variable(name string) Expression {
pbVal := &pb.Value{ValueType: &pb.Value_VariableReferenceValue{VariableReferenceValue: name}}
return &baseExpression{pbVal: pbVal}
}

// ArrayLength creates an expression that calculates the length of an array.
// - exprOrFieldPath can be a field path string, [FieldPath] or an [Expression] that evaluates to an array.
//
Expand Down Expand Up @@ -912,6 +939,16 @@ func GetDocumentID(exprStringOrDocRef any) Expression {
return newBaseFunction("document_id", []Expression{expr})
}

// GetField creates an expression that accesses a field/property of a document field using the provided key.
// - exprOrField: The expression representing the document or map.
// - key: The key of the field to access.
//
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
// regardless of any other documented package stability guarantees.
func GetField(exprOrField any, key any) Expression {
return newBaseFunction("get_field", []Expression{asFieldExpr(exprOrField), asStringExpr(key)})
}

// Conditional creates an expression that evaluates a condition and returns one of two expressions.
// - condition is the boolean expression to evaluate.
// - thenVal is the expression to return if the condition is true.
Expand Down
38 changes: 38 additions & 0 deletions firestore/pipeline_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,3 +1080,41 @@ func TestArrayFunctions(t *testing.T) {
})
}
}

func TestGetFieldVariations(t *testing.T) {
// 1. (e Expression) GetField(string)
expr1 := FieldOf("doc").GetField("title")
if expr1 == nil {
t.Fatal("expected expr1 not to be nil")
}

// 2. (e Expression) GetField(Expression)
expr2 := FieldOf("doc").GetField(ConstantOf("title"))
if expr2 == nil {
t.Fatal("expected expr2 not to be nil")
}

// 3. GetField(string, string)
expr3 := GetField("doc", "title")
if expr3 == nil {
t.Fatal("expected expr3 not to be nil")
}

// 4. GetField(Expression, Expression)
expr4 := GetField(Variable("doc"), ConstantOf("title"))
if expr4 == nil {
t.Fatal("expected expr4 not to be nil")
}

// 5. GetField(string, Expression)
expr5 := GetField("doc", ConstantOf("title"))
if expr5 == nil {
t.Fatal("expected expr5 not to be nil")
}

// 6. GetField(Expression, string)
expr6 := GetField(Variable("doc"), "title")
if expr6 == nil {
t.Fatal("expected expr6 not to be nil")
}
}
Loading
Loading