Skip to content
This repository was archived by the owner on Mar 2, 2026. It is now read-only.

Commit 138ad9b

Browse files
Merge branch 'pipeline-preview' into fix_expressions
2 parents 4b81052 + aef4391 commit 138ad9b

26 files changed

Lines changed: 365 additions & 243 deletions

google/cloud/firestore_v1/async_pipeline.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
1919

2020
if TYPE_CHECKING: # pragma: NO COVER
21+
import datetime
2122
from google.cloud.firestore_v1.async_client import AsyncClient
2223
from google.cloud.firestore_v1.pipeline_result import PipelineResult
2324
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
@@ -59,6 +60,7 @@ def __init__(self, client: AsyncClient, *stages: stages.Stage):
5960
async def execute(
6061
self,
6162
transaction: "AsyncTransaction" | None = None,
63+
read_time: datetime.datetime | None = None,
6264
) -> list[PipelineResult]:
6365
"""
6466
Executes this pipeline and returns results as a list
@@ -70,12 +72,22 @@ async def execute(
7072
If a ``transaction`` is used and it already has write operations
7173
added, this method cannot be used (i.e. read-after-write is not
7274
allowed).
75+
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
76+
time. This must be a microsecond precision timestamp within the past one hour, or
77+
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
78+
within the past 7 days. For the most accurate results, use UTC timezone.
7379
"""
74-
return [result async for result in self.stream(transaction=transaction)]
80+
return [
81+
result
82+
async for result in self.stream(
83+
transaction=transaction, read_time=read_time
84+
)
85+
]
7586

7687
async def stream(
7788
self,
7889
transaction: "AsyncTransaction" | None = None,
90+
read_time: datetime.datetime | None = None,
7991
) -> AsyncIterable[PipelineResult]:
8092
"""
8193
Process this pipeline as a stream, providing results through an Iterable
@@ -87,8 +99,12 @@ async def stream(
8799
If a ``transaction`` is used and it already has write operations
88100
added, this method cannot be used (i.e. read-after-write is not
89101
allowed).
102+
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
103+
time. This must be a microsecond precision timestamp within the past one hour, or
104+
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
105+
within the past 7 days. For the most accurate results, use UTC timezone.
90106
"""
91-
request = self._prep_execute_request(transaction)
107+
request = self._prep_execute_request(transaction, read_time)
92108
async for response in await self._client._firestore_api.execute_pipeline(
93109
request
94110
):

google/cloud/firestore_v1/base_aggregation.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from google.cloud.firestore_v1.stream_generator import (
4949
StreamGenerator,
5050
)
51+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
5152

5253
import datetime
5354

@@ -356,19 +357,20 @@ def stream(
356357
A generator of the query results.
357358
"""
358359

359-
def pipeline(self):
360+
def _build_pipeline(self, source: "PipelineSource"):
360361
"""
361362
Convert this query into a Pipeline
362363
363364
Queries containing a `cursor` or `limit_to_last` are not currently supported
364365
366+
Args:
367+
source: the PipelineSource to build the pipeline off of
365368
Raises:
366-
- ValueError: raised if Query wasn't created with an associated client
367369
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
368370
Returns:
369371
a Pipeline representing the query
370372
"""
371373
# use autoindexer to keep track of which field number to use for un-aliased fields
372374
autoindexer = itertools.count(start=1)
373375
exprs = [a._to_pipeline_expr(autoindexer) for a in self._aggregations]
374-
return self._nested_query.pipeline().aggregate(*exprs)
376+
return self._nested_query._build_pipeline(source).aggregate(*exprs)

google/cloud/firestore_v1/base_collection.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from google.cloud.firestore_v1.async_document import AsyncDocumentReference
4949
from google.cloud.firestore_v1.document import DocumentReference
5050
from google.cloud.firestore_v1.field_path import FieldPath
51+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
5152
from google.cloud.firestore_v1.query_profile import ExplainOptions
5253
from google.cloud.firestore_v1.query_results import QueryResultsList
5354
from google.cloud.firestore_v1.stream_generator import StreamGenerator
@@ -602,18 +603,20 @@ def find_nearest(
602603
distance_threshold=distance_threshold,
603604
)
604605

605-
def pipeline(self):
606+
def _build_pipeline(self, source: "PipelineSource"):
606607
"""
607608
Convert this query into a Pipeline
608609
609610
Queries containing a `cursor` or `limit_to_last` are not currently supported
610611
612+
Args:
613+
source: the PipelineSource to build the pipeline off o
611614
Raises:
612615
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
613616
Returns:
614617
a Pipeline representing the query
615618
"""
616-
return self._query().pipeline()
619+
return self._query()._build_pipeline(source)
617620

618621

619622
def _auto_id() -> str:

google/cloud/firestore_v1/base_pipeline.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from google.cloud.firestore_v1 import _helpers
3434

3535
if TYPE_CHECKING: # pragma: NO COVER
36+
import datetime
3637
from google.cloud.firestore_v1.client import Client
3738
from google.cloud.firestore_v1.async_client import AsyncClient
3839
from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse
@@ -99,7 +100,9 @@ def _append(self, new_stage):
99100
return self.__class__._create_with_stages(self._client, *self.stages, new_stage)
100101

101102
def _prep_execute_request(
102-
self, transaction: BaseTransaction | None
103+
self,
104+
transaction: BaseTransaction | None,
105+
read_time: datetime.datetime | None,
103106
) -> ExecutePipelineRequest:
104107
"""
105108
shared logic for creating an ExecutePipelineRequest
@@ -116,6 +119,7 @@ def _prep_execute_request(
116119
database=database_name,
117120
transaction=transaction_id,
118121
structured_pipeline=self._to_pb(),
122+
read_time=read_time,
119123
)
120124
return request
121125

google/cloud/firestore_v1/base_query.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from google.cloud.firestore_v1.query_profile import ExplainOptions
6868
from google.cloud.firestore_v1.query_results import QueryResultsList
6969
from google.cloud.firestore_v1.stream_generator import StreamGenerator
70+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
7071

7172
import datetime
7273

@@ -1129,24 +1130,23 @@ def recursive(self: QueryType) -> QueryType:
11291130

11301131
return copied
11311132

1132-
def pipeline(self):
1133+
def _build_pipeline(self, source: "PipelineSource"):
11331134
"""
11341135
Convert this query into a Pipeline
11351136
11361137
Queries containing a `cursor` or `limit_to_last` are not currently supported
11371138
1139+
Args:
1140+
source: the PipelineSource to build the pipeline off of
11381141
Raises:
1139-
- ValueError: raised if Query wasn't created with an associated client
11401142
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
11411143
Returns:
11421144
a Pipeline representing the query
11431145
"""
1144-
if not self._client:
1145-
raise ValueError("Query does not have an associated client")
11461146
if self._all_descendants:
1147-
ppl = self._client.pipeline().collection_group(self._parent.id)
1147+
ppl = source.collection_group(self._parent.id)
11481148
else:
1149-
ppl = self._client.pipeline().collection(self._parent._path)
1149+
ppl = source.collection(self._parent._path)
11501150

11511151
# Filters
11521152
for filter_ in self._field_filters:

google/cloud/firestore_v1/pipeline.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
1919

2020
if TYPE_CHECKING: # pragma: NO COVER
21+
import datetime
2122
from google.cloud.firestore_v1.client import Client
2223
from google.cloud.firestore_v1.pipeline_result import PipelineResult
2324
from google.cloud.firestore_v1.transaction import Transaction
@@ -56,6 +57,7 @@ def __init__(self, client: Client, *stages: stages.Stage):
5657
def execute(
5758
self,
5859
transaction: "Transaction" | None = None,
60+
read_time: datetime.datetime | None = None,
5961
) -> list[PipelineResult]:
6062
"""
6163
Executes this pipeline and returns results as a list
@@ -67,12 +69,20 @@ def execute(
6769
If a ``transaction`` is used and it already has write operations
6870
added, this method cannot be used (i.e. read-after-write is not
6971
allowed).
72+
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
73+
time. This must be a microsecond precision timestamp within the past one hour, or
74+
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
75+
within the past 7 days. For the most accurate results, use UTC timezone.
7076
"""
71-
return [result for result in self.stream(transaction=transaction)]
77+
return [
78+
result
79+
for result in self.stream(transaction=transaction, read_time=read_time)
80+
]
7281

7382
def stream(
7483
self,
7584
transaction: "Transaction" | None = None,
85+
read_time: datetime.datetime | None = None,
7686
) -> Iterable[PipelineResult]:
7787
"""
7888
Process this pipeline as a stream, providing results through an Iterable
@@ -84,7 +94,11 @@ def stream(
8494
If a ``transaction`` is used and it already has write operations
8595
added, this method cannot be used (i.e. read-after-write is not
8696
allowed).
97+
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
98+
time. This must be a microsecond precision timestamp within the past one hour, or
99+
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
100+
within the past 7 days. For the most accurate results, use UTC timezone.
87101
"""
88-
request = self._prep_execute_request(transaction)
102+
request = self._prep_execute_request(transaction, read_time)
89103
for response in self._client._firestore_api.execute_pipeline(request):
90104
yield from self._execute_response_helper(response)

google/cloud/firestore_v1/pipeline_expressions.py

Lines changed: 8 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -835,56 +835,6 @@ def if_absent(self, default_value: Expression | CONSTANT_TYPE) -> "Expression":
835835
[self, self._cast_to_expr_or_convert_to_constant(default_value)],
836836
)
837837

838-
@expose_as_static
839-
def is_nan(self) -> "BooleanExpression":
840-
"""Creates an expression that checks if this expression evaluates to 'NaN' (Not a Number).
841-
842-
Example:
843-
>>> # Check if the result of a calculation is NaN
844-
>>> Field.of("value").divide(0).is_nan()
845-
846-
Returns:
847-
A new `Expression` representing the 'isNaN' check.
848-
"""
849-
return BooleanExpression("is_nan", [self])
850-
851-
@expose_as_static
852-
def is_not_nan(self) -> "BooleanExpression":
853-
"""Creates an expression that checks if this expression evaluates to a non-'NaN' (Not a Number) value.
854-
855-
Example:
856-
>>> # Check if the result of a calculation is not NaN
857-
>>> Field.of("value").divide(1).is_not_nan()
858-
859-
Returns:
860-
A new `Expression` representing the 'is not NaN' check.
861-
"""
862-
return BooleanExpression("is_not_nan", [self])
863-
864-
@expose_as_static
865-
def is_null(self) -> "BooleanExpression":
866-
"""Creates an expression that checks if the value of a field is 'Null'.
867-
868-
Example:
869-
>>> Field.of("value").is_null()
870-
871-
Returns:
872-
A new `Expression` representing the 'isNull' check.
873-
"""
874-
return BooleanExpression("is_null", [self])
875-
876-
@expose_as_static
877-
def is_not_null(self) -> "BooleanExpression":
878-
"""Creates an expression that checks if the value of a field is not 'Null'.
879-
880-
Example:
881-
>>> Field.of("value").is_not_null()
882-
883-
Returns:
884-
A new `Expression` representing the 'isNotNull' check.
885-
"""
886-
return BooleanExpression("is_not_null", [self])
887-
888838
@expose_as_static
889839
def is_error(self):
890840
"""Creates an expression that checks if a given expression produces an error
@@ -1672,7 +1622,10 @@ def of(value: CONSTANT_TYPE) -> Constant[CONSTANT_TYPE]:
16721622
return Constant(value)
16731623

16741624
def __repr__(self):
1675-
return f"Constant.of({self.value!r})"
1625+
value_str = repr(self.value)
1626+
if isinstance(self.value, float) and value_str == "nan":
1627+
value_str = "math.nan"
1628+
return f"Constant.of({value_str})"
16761629

16771630
def __hash__(self):
16781631
return hash(self.value)
@@ -1846,13 +1799,13 @@ def _from_query_filter_pb(filter_pb, client):
18461799
elif isinstance(filter_pb, Query_pb.UnaryFilter):
18471800
field = Field.of(filter_pb.field.field_path)
18481801
if filter_pb.op == Query_pb.UnaryFilter.Operator.IS_NAN:
1849-
return And(field.exists(), field.is_nan())
1802+
return And(field.exists(), field.equal(float("nan")))
18501803
elif filter_pb.op == Query_pb.UnaryFilter.Operator.IS_NOT_NAN:
1851-
return And(field.exists(), field.is_not_nan())
1804+
return And(field.exists(), Not(field.equal(float("nan"))))
18521805
elif filter_pb.op == Query_pb.UnaryFilter.Operator.IS_NULL:
1853-
return And(field.exists(), field.is_null())
1806+
return And(field.exists(), field.equal(None))
18541807
elif filter_pb.op == Query_pb.UnaryFilter.Operator.IS_NOT_NULL:
1855-
return And(field.exists(), field.is_not_null())
1808+
return And(field.exists(), Not(field.equal(None)))
18561809
else:
18571810
raise TypeError(f"Unexpected UnaryFilter operator type: {filter_pb.op}")
18581811
elif isinstance(filter_pb, Query_pb.FieldFilter):

google/cloud/firestore_v1/pipeline_source.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
from google.cloud.firestore_v1.client import Client
2323
from google.cloud.firestore_v1.async_client import AsyncClient
2424
from google.cloud.firestore_v1.base_document import BaseDocumentReference
25+
from google.cloud.firestore_v1.base_query import BaseQuery
26+
from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery
27+
from google.cloud.firestore_v1.base_collection import BaseCollectionReference
2528

2629

2730
PipelineType = TypeVar("PipelineType", bound=_BasePipeline)
@@ -43,6 +46,23 @@ def __init__(self, client: Client | AsyncClient):
4346
def _create_pipeline(self, source_stage):
4447
return self.client._pipeline_cls._create_with_stages(self.client, source_stage)
4548

49+
def create_from(
50+
self, query: "BaseQuery" | "BaseAggregationQuery" | "BaseCollectionReference"
51+
) -> PipelineType:
52+
"""
53+
Create a pipeline from an existing query
54+
55+
Queries containing a `cursor` or `limit_to_last` are not currently supported
56+
57+
Args:
58+
query: the query to build the pipeline off of
59+
Raises:
60+
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
61+
Returns:
62+
a new pipeline instance representing the query
63+
"""
64+
return query._build_pipeline(self)
65+
4666
def collection(self, path: str | tuple[str]) -> PipelineType:
4767
"""
4868
Creates a new Pipeline that operates on a specified Firestore collection.

tests/system/pipeline_e2e/array.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ tests:
185185
- AliasedExpression:
186186
- Function.array_concat:
187187
- Field: tags
188-
- Constant: ["new_tag", "another_tag"]
188+
- ["new_tag", "another_tag"]
189189
- "concatenatedTags"
190190
assert_results:
191191
- concatenatedTags:
@@ -232,8 +232,8 @@ tests:
232232
- AliasedExpression:
233233
- Function.array_concat:
234234
- Field: tags
235-
- Constant: ["sci-fi"]
236-
- Constant: ["classic", "epic"]
235+
- ["sci-fi"]
236+
- ["classic", "epic"]
237237
- "concatenatedTags"
238238
assert_results:
239239
- concatenatedTags:

tests/system/pipeline_e2e/data.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,9 @@ data:
139139
vec3:
140140
embedding: [5.0, 6.0, 7.0]
141141
vec4:
142-
embedding: [1.0, 2.0, 4.0]
142+
embedding: [1.0, 2.0, 4.0]
143+
errors:
144+
doc_with_nan:
145+
value: "NaN"
146+
doc_with_null:
147+
value: null

0 commit comments

Comments
 (0)