|
57 | 57 | TEST_DATABASES, |
58 | 58 | TEST_DATABASES_W_ENTERPRISE, |
59 | 59 | IS_KOKORO_TEST, |
| 60 | + FIRESTORE_ENTERPRISE_DB, |
60 | 61 | ) |
61 | 62 |
|
62 | 63 | RETRIES = retries.AsyncRetry( |
@@ -1611,6 +1612,54 @@ async def test_query_stream_w_read_time(query_docs, cleanup, database): |
1611 | 1612 | assert new_values[new_ref.id] == new_data |
1612 | 1613 |
|
1613 | 1614 |
|
| 1615 | +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) |
| 1616 | +async def test_pipeline_w_read_time(query_docs, cleanup, database): |
| 1617 | + # return early on kokoro. Test project doesn't currently support pipelines |
| 1618 | + # TODO: enable pipeline verification when kokoro test project is whitelisted |
| 1619 | + if IS_KOKORO_TEST: |
| 1620 | + pytest.skip("skipping pipeline verification on kokoro") |
| 1621 | + |
| 1622 | + collection, stored, allowed_vals = query_docs |
| 1623 | + num_vals = len(allowed_vals) |
| 1624 | + |
| 1625 | + # Find a read_time before adding the new document. |
| 1626 | + snapshots = await collection.get() |
| 1627 | + read_time = snapshots[0].read_time |
| 1628 | + |
| 1629 | + new_data = { |
| 1630 | + "a": 9000, |
| 1631 | + "b": 1, |
| 1632 | + "c": [10000, 1000], |
| 1633 | + "stats": {"sum": 9001, "product": 9000}, |
| 1634 | + } |
| 1635 | + _, new_ref = await collection.add(new_data) |
| 1636 | + # Add to clean-up. |
| 1637 | + cleanup(new_ref.delete) |
| 1638 | + stored[new_ref.id] = new_data |
| 1639 | + |
| 1640 | + pipeline = collection.where(filter=FieldFilter("b", "==", 1)).pipeline() |
| 1641 | + |
| 1642 | + # new query should have new_data |
| 1643 | + new_results = [result async for result in pipeline.stream()] |
| 1644 | + new_values = {result.ref.id: result.data() for result in new_results} |
| 1645 | + assert len(new_values) == num_vals + 1 |
| 1646 | + assert new_ref.id in new_values |
| 1647 | + assert new_values[new_ref.id] == new_data |
| 1648 | + |
| 1649 | + # pipeline with read_time should not have new_data |
| 1650 | + results = [result async for result in pipeline.stream(read_time=read_time)] |
| 1651 | + |
| 1652 | + values = {result.ref.id: result.data() for result in results} |
| 1653 | + |
| 1654 | + assert len(values) == num_vals |
| 1655 | + assert new_ref.id not in values |
| 1656 | + for key, value in values.items(): |
| 1657 | + assert stored[key] == value |
| 1658 | + assert value["b"] == 1 |
| 1659 | + assert value["a"] != 9000 |
| 1660 | + assert key != new_ref.id |
| 1661 | + |
| 1662 | + |
1614 | 1663 | @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) |
1615 | 1664 | async def test_query_with_order_dot_key(client, cleanup, database): |
1616 | 1665 | db = client |
|
0 commit comments