Skip to content
This repository was archived by the owner on Mar 2, 2026. It is now read-only.

Commit fcd57d9

Browse files
Merge branch 'pipeline-preview' into pipeline_run_kokoro
2 parents 2834940 + 18dfc6a commit fcd57d9

30 files changed

Lines changed: 1510 additions & 365 deletions

google/cloud/firestore_v1/async_pipeline.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,20 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16-
from typing import AsyncIterable, TYPE_CHECKING
16+
from typing import TYPE_CHECKING
1717
from google.cloud.firestore_v1 import pipeline_stages as stages
1818
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
19+
from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream
20+
from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
21+
from google.cloud.firestore_v1.pipeline_result import PipelineResult
1922

2023
if TYPE_CHECKING: # pragma: NO COVER
24+
import datetime
2125
from google.cloud.firestore_v1.async_client import AsyncClient
22-
from google.cloud.firestore_v1.pipeline_result import PipelineResult
2326
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
27+
from google.cloud.firestore_v1.pipeline_expressions import Constant
28+
from google.cloud.firestore_v1.types.document import Value
29+
from google.cloud.firestore_v1.query_profile import PipelineExplainOptions
2430

2531

2632
class AsyncPipeline(_BasePipeline):
@@ -40,7 +46,7 @@ class AsyncPipeline(_BasePipeline):
4046
... .collection("books")
4147
... .where(Field.of("published").gt(1980))
4248
... .select("title", "author")
43-
... async for result in pipeline.execute():
49+
... async for result in pipeline.stream():
4450
... print(result)
4551
4652
Use `client.pipeline()` to create instances of this class.
@@ -58,39 +64,68 @@ def __init__(self, client: AsyncClient, *stages: stages.Stage):
5864

5965
async def execute(
6066
self,
67+
*,
6168
transaction: "AsyncTransaction" | None = None,
62-
) -> list[PipelineResult]:
69+
read_time: datetime.datetime | None = None,
70+
explain_options: PipelineExplainOptions | None = None,
71+
index_mode: str | None = None,
72+
additional_options: dict[str, Value | Constant] = {},
73+
) -> PipelineSnapshot[PipelineResult]:
6374
"""
6475
Executes this pipeline and returns results as a list
6576
6677
Args:
67-
transaction
68-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
78+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
6979
An existing transaction that this query will run in.
7080
If a ``transaction`` is used and it already has write operations
7181
added, this method cannot be used (i.e. read-after-write is not
7282
allowed).
83+
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
84+
time. This must be a microsecond precision timestamp within the past one hour, or
85+
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
86+
within the past 7 days. For the most accurate results, use UTC timezone.
87+
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
88+
Options to enable query profiling for this query. When set,
89+
explain_metrics will be available on the returned list.
90+
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
91+
Firestore will reject the request if there is not appropiate indexes to serve the query.
92+
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
93+
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
7394
"""
74-
return [result async for result in self.stream(transaction=transaction)]
95+
kwargs = {k: v for k, v in locals().items() if k != "self"}
96+
stream = AsyncPipelineStream(PipelineResult, self, **kwargs)
97+
results = [result async for result in stream]
98+
return PipelineSnapshot(results, stream)
7599

76-
async def stream(
100+
def stream(
77101
self,
102+
*,
103+
read_time: datetime.datetime | None = None,
78104
transaction: "AsyncTransaction" | None = None,
79-
) -> AsyncIterable[PipelineResult]:
105+
explain_options: PipelineExplainOptions | None = None,
106+
index_mode: str | None = None,
107+
additional_options: dict[str, Value | Constant] = {},
108+
) -> AsyncPipelineStream[PipelineResult]:
80109
"""
81-
Process this pipeline as a stream, providing results through an Iterable
110+
Process this pipeline as a stream, providing results through an AsyncIterable
82111
83112
Args:
84-
transaction
85-
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
113+
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
86114
An existing transaction that this query will run in.
87115
If a ``transaction`` is used and it already has write operations
88116
added, this method cannot be used (i.e. read-after-write is not
89117
allowed).
118+
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
119+
time. This must be a microsecond precision timestamp within the past one hour, or
120+
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
121+
within the past 7 days. For the most accurate results, use UTC timezone.
122+
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
123+
Options to enable query profiling for this query. When set,
124+
explain_metrics will be available on the returned generator.
125+
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
126+
Firestore will reject the request if there is not appropiate indexes to serve the query.
127+
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
128+
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
90129
"""
91-
request = self._prep_execute_request(transaction)
92-
async for response in await self._client._firestore_api.execute_pipeline(
93-
request
94-
):
95-
for result in self._execute_response_helper(response):
96-
yield result
130+
kwargs = {k: v for k, v in locals().items() if k != "self"}
131+
return AsyncPipelineStream(PipelineResult, self, **kwargs)

google/cloud/firestore_v1/base_aggregation.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from google.cloud.firestore_v1.stream_generator import (
4949
StreamGenerator,
5050
)
51+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
5152

5253
import datetime
5354

@@ -356,19 +357,20 @@ def stream(
356357
A generator of the query results.
357358
"""
358359

359-
def pipeline(self):
360+
def _build_pipeline(self, source: "PipelineSource"):
360361
"""
361362
Convert this query into a Pipeline
362363
363364
Queries containing a `cursor` or `limit_to_last` are not currently supported
364365
366+
Args:
367+
source: the PipelineSource to build the pipeline off of
365368
Raises:
366-
- ValueError: raised if Query wasn't created with an associated client
367369
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
368370
Returns:
369371
a Pipeline representing the query
370372
"""
371373
# use autoindexer to keep track of which field number to use for un-aliased fields
372374
autoindexer = itertools.count(start=1)
373375
exprs = [a._to_pipeline_expr(autoindexer) for a in self._aggregations]
374-
return self._nested_query.pipeline().aggregate(*exprs)
376+
return self._nested_query._build_pipeline(source).aggregate(*exprs)

google/cloud/firestore_v1/base_collection.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from google.cloud.firestore_v1.async_document import AsyncDocumentReference
4949
from google.cloud.firestore_v1.document import DocumentReference
5050
from google.cloud.firestore_v1.field_path import FieldPath
51+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
5152
from google.cloud.firestore_v1.query_profile import ExplainOptions
5253
from google.cloud.firestore_v1.query_results import QueryResultsList
5354
from google.cloud.firestore_v1.stream_generator import StreamGenerator
@@ -602,18 +603,20 @@ def find_nearest(
602603
distance_threshold=distance_threshold,
603604
)
604605

605-
def pipeline(self):
606+
def _build_pipeline(self, source: "PipelineSource"):
606607
"""
607608
Convert this query into a Pipeline
608609
609610
Queries containing a `cursor` or `limit_to_last` are not currently supported
610611
612+
Args:
613+
source: the PipelineSource to build the pipeline off o
611614
Raises:
612615
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
613616
Returns:
614617
a Pipeline representing the query
615618
"""
616-
return self._query().pipeline()
619+
return self._query()._build_pipeline(source)
617620

618621

619622
def _auto_id() -> str:

google/cloud/firestore_v1/base_pipeline.py

Lines changed: 4 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16-
from typing import Iterable, Sequence, TYPE_CHECKING
16+
from typing import Sequence, TYPE_CHECKING
1717
from google.cloud.firestore_v1 import pipeline_stages as stages
1818
from google.cloud.firestore_v1.types.pipeline import (
1919
StructuredPipeline as StructuredPipeline_pb,
2020
)
2121
from google.cloud.firestore_v1.vector import Vector
2222
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
23-
from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest
24-
from google.cloud.firestore_v1.pipeline_result import PipelineResult
2523
from google.cloud.firestore_v1.pipeline_expressions import (
2624
AggregateFunction,
2725
AliasedExpression,
@@ -30,13 +28,10 @@
3028
BooleanExpression,
3129
Selectable,
3230
)
33-
from google.cloud.firestore_v1 import _helpers
3431

3532
if TYPE_CHECKING: # pragma: NO COVER
3633
from google.cloud.firestore_v1.client import Client
3734
from google.cloud.firestore_v1.async_client import AsyncClient
38-
from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse
39-
from google.cloud.firestore_v1.transaction import BaseTransaction
4035

4136

4237
class _BasePipeline:
@@ -87,9 +82,10 @@ def __repr__(self):
8782
stages_str = ",\n ".join([repr(s) for s in self.stages])
8883
return f"{cls_str}(\n {stages_str}\n)"
8984

90-
def _to_pb(self) -> StructuredPipeline_pb:
85+
def _to_pb(self, **options) -> StructuredPipeline_pb:
9186
return StructuredPipeline_pb(
92-
pipeline={"stages": [s._to_pb() for s in self.stages]}
87+
pipeline={"stages": [s._to_pb() for s in self.stages]},
88+
options=options,
9389
)
9490

9591
def _append(self, new_stage):
@@ -98,44 +94,6 @@ def _append(self, new_stage):
9894
"""
9995
return self.__class__._create_with_stages(self._client, *self.stages, new_stage)
10096

101-
def _prep_execute_request(
102-
self, transaction: BaseTransaction | None
103-
) -> ExecutePipelineRequest:
104-
"""
105-
shared logic for creating an ExecutePipelineRequest
106-
"""
107-
database_name = (
108-
f"projects/{self._client.project}/databases/{self._client._database}"
109-
)
110-
transaction_id = (
111-
_helpers.get_transaction_id(transaction)
112-
if transaction is not None
113-
else None
114-
)
115-
request = ExecutePipelineRequest(
116-
database=database_name,
117-
transaction=transaction_id,
118-
structured_pipeline=self._to_pb(),
119-
)
120-
return request
121-
122-
def _execute_response_helper(
123-
self, response: ExecutePipelineResponse
124-
) -> Iterable[PipelineResult]:
125-
"""
126-
shared logic for unpacking an ExecutePipelineReponse into PipelineResults
127-
"""
128-
for doc in response.results:
129-
ref = self._client.document(doc.name) if doc.name else None
130-
yield PipelineResult(
131-
self._client,
132-
doc.fields,
133-
ref,
134-
response._pb.execution_time,
135-
doc._pb.create_time if doc.create_time else None,
136-
doc._pb.update_time if doc.update_time else None,
137-
)
138-
13997
def add_fields(self, *fields: Selectable) -> "_BasePipeline":
14098
"""
14199
Adds new fields to outputs from previous stages.

google/cloud/firestore_v1/base_query.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from google.cloud.firestore_v1.query_profile import ExplainOptions
6868
from google.cloud.firestore_v1.query_results import QueryResultsList
6969
from google.cloud.firestore_v1.stream_generator import StreamGenerator
70+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
7071

7172
import datetime
7273

@@ -1129,24 +1130,23 @@ def recursive(self: QueryType) -> QueryType:
11291130

11301131
return copied
11311132

1132-
def pipeline(self):
1133+
def _build_pipeline(self, source: "PipelineSource"):
11331134
"""
11341135
Convert this query into a Pipeline
11351136
11361137
Queries containing a `cursor` or `limit_to_last` are not currently supported
11371138
1139+
Args:
1140+
source: the PipelineSource to build the pipeline off of
11381141
Raises:
1139-
- ValueError: raised if Query wasn't created with an associated client
11401142
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
11411143
Returns:
11421144
a Pipeline representing the query
11431145
"""
1144-
if not self._client:
1145-
raise ValueError("Query does not have an associated client")
11461146
if self._all_descendants:
1147-
ppl = self._client.pipeline().collection_group(self._parent.id)
1147+
ppl = source.collection_group(self._parent.id)
11481148
else:
1149-
ppl = self._client.pipeline().collection(self._parent._path)
1149+
ppl = source.collection(self._parent._path)
11501150

11511151
# Filters
11521152
for filter_ in self._field_filters:

0 commit comments

Comments
 (0)