feat: BI-6882 Cache invalidation in data flow#1672
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a dedicated “invalidation cache” (Redis-backed) to compute a small per-dataset/connection payload and incorporate it into the main data cache key, so cached /result responses automatically refresh when the invalidation payload changes.
Changes:
- Introduces an invalidation cache engine (RedisCacheLock-coordinated) plus service-registry plumbing and settings to provision a dedicated Redis instance/DB.
- Integrates invalidation payload computation into the Data API request flow and threads the payload down to query execution so it affects cache keys.
- Adds unit + integration tests covering invalidation engine primitives and end-to-end cache-key behavior.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| lib/dl_query_processing/dl_query_processing/execution/executor.py | Extends prepared source data_key with invalidation payload so cache keys change with payload. |
| lib/dl_core/dl_core/services_registry/top_level.py | Adds SR getters/factories for invalidation cache Redis + engine factory. |
| lib/dl_core/dl_core/services_registry/sr_factories.py | Wires invalidation Redis factory through SR construction. |
| lib/dl_core/dl_core/services_registry/invalidation_cache_engine_factory.py | New factory for creating InvalidationCacheEngine from SR Redis client. |
| lib/dl_core/dl_core/aio/middlewares/services_registry.py | Adds middleware flag to inject invalidation Redis client factory into SR. |
| lib/dl_core/dl_core/aio/aiohttp_wrappers_data_core.py | Adds request helper to fetch invalidation Redis client from app services. |
| lib/dl_core_testing/dl_core_testing/configuration.py | Adds a dedicated Redis DB setting for invalidation cache in tests. |
| lib/dl_constants/dl_constants/enums.py | Adds RedisInstanceKind.invalidation_caches. |
| lib/dl_cache_engine/dl_cache_engine/processing_helper.py | Adds helper method to query invalidation cache engine and return payload. |
| lib/dl_cache_engine/dl_cache_engine/invalidation.py | New invalidation cache engine implementation (keying, entry schema, RCL coordination). |
| lib/dl_cache_engine/dl_cache_engine/engine.py | Removes trailing whitespace (no functional change). |
| lib/dl_cache_engine/dl_cache_engine_tests/unit/test_invalidation.py | New unit tests for invalidation key + entry serialization helpers. |
| lib/dl_api_lib/dl_api_lib/dataset/view.py | Threads invalidation payload into QueryExecutor. |
| lib/dl_api_lib/dl_api_lib/dataset/validator.py | Extracts filter validation into a standalone helper for reuse. |
| lib/dl_api_lib/dl_api_lib/dataset/cache_invalidation.py | New orchestrator to compute invalidation payload (SQL or formula) with throttling. |
| lib/dl_api_lib/dl_api_lib/app/data_api/resources/dataset/range.py | Updates execute_query signature to accept invalidation/caching controls. |
| lib/dl_api_lib/dl_api_lib/app/data_api/resources/dataset/preview.py | Updates execute_query signature to accept invalidation/caching controls. |
| lib/dl_api_lib/dl_api_lib/app/data_api/resources/dataset/cache_invalidation_test.py | Adjusts test endpoint execution flags to avoid recursion and caching. |
| lib/dl_api_lib/dl_api_lib/app/data_api/resources/dataset/base.py | Computes invalidation payload and passes it into query execution to affect cache keys. |
| lib/dl_api_lib/dl_api_lib/app/data_api/app.py | Provisions invalidation Redis service (single host / sentinel) on app startup. |
| lib/dl_api_lib/dl_api_lib/app_settings.py | Adds Data API settings for invalidation cache enablement + Redis config model. |
| lib/dl_api_lib/dl_api_lib_tests/db/data_api/test_cache_invalidation_data_flow.py | New integration tests asserting payload impacts cache hits/misses. |
| lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py | Enables invalidation Redis settings in test app when requested. |
| lib/dl_api_lib_testing/dl_api_lib_testing/app.py | Passes use_invalidation_cache into SR middleware for tests. |
| app/dl_data_api/dl_data_api/app_factory.py | Passes use_invalidation_cache into SR middleware for the real app factory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 28 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| filter_specs=filter_specs, | ||
| meta=RawQueryMetaInfo(query_type=QueryType.result), | ||
| limit=2, | ||
| disable_rls=True, |
There was a problem hiding this comment.
disable_rls=True triggers an admin-permission check in the query formalization pipeline (see dl_api_lib/query/formalization/query_formalizer.py), so for non-admin users the formula invalidation query will raise and invalidation will silently degrade to None. If the goal is to run invalidation for regular users, keep RLS enabled here (or execute the invalidation query in a way that doesn’t require disable_rls).
| disable_rls=True, | |
| disable_rls=False, |
| block_spec: BlockSpec, | ||
| possible_data_lengths: Optional[Collection] = None, | ||
| profiling_postfix: str = "", | ||
| parameter_value_specs: list[ParameterValueSpec] | None = None, | ||
| allow_cache_usage: bool | None = None, | ||
| cache_invalidation_payload: str | None = None, | ||
| ) -> PostprocessedQuery: |
There was a problem hiding this comment.
This override accepts allow_cache_usage and cache_invalidation_payload, but they are never used/forwarded to super().execute_query(...). As a result, invalidation payload and per-request cache enable/disable won’t affect preview queries.
| block_spec: BlockSpec, | ||
| possible_data_lengths: Optional[Collection] = None, | ||
| profiling_postfix: str = "", | ||
| parameter_value_specs: list[ParameterValueSpec] | None = None, | ||
| allow_cache_usage: bool | None = None, | ||
| cache_invalidation_payload: str | None = None, | ||
| ) -> PostprocessedQuery: |
There was a problem hiding this comment.
allow_cache_usage and cache_invalidation_payload were added to the method signature but are not used inside the method (e.g., not forwarded to super().execute_query(...) in the slow-path). This makes invalidation/caching controls ineffective for the range endpoint.
| key = InvalidationCacheKey( | ||
| dataset_id=dataset.uuid or "", | ||
| dataset_revision_id=dataset.revision_id or "", | ||
| connection_id=connection.uuid or "", | ||
| connection_revision_id=connection.revision_id or "", |
There was a problem hiding this comment.
Building the invalidation cache key with ... or "" can collapse distinct datasets (or connections) into the same Redis key when IDs/revision IDs are missing (e.g., unsaved datasets in preview flows). Prefer to skip invalidation entirely unless all key components are present/non-empty to avoid cross-entity key collisions.
| key = InvalidationCacheKey( | |
| dataset_id=dataset.uuid or "", | |
| dataset_revision_id=dataset.revision_id or "", | |
| connection_id=connection.uuid or "", | |
| connection_revision_id=connection.revision_id or "", | |
| dataset_id = dataset.uuid | |
| dataset_revision_id = dataset.revision_id | |
| connection_id = connection.uuid | |
| connection_revision_id = connection.revision_id | |
| if not all((dataset_id, dataset_revision_id, connection_id, connection_revision_id)): | |
| LOGGER.debug("Skipping invalidation check because cache key components are missing") | |
| return None | |
| key = InvalidationCacheKey( | |
| dataset_id=dataset_id, | |
| dataset_revision_id=dataset_revision_id, | |
| connection_id=connection_id, | |
| connection_revision_id=connection_revision_id, |
There was a problem hiding this comment.
cache invalidation is off in unsaved new ds
| def get_cache_engine(self) -> Optional[InvalidationCacheEngine]: | ||
| redis_client = self.service_registry.get_cache_invalidations_redis_client() | ||
| if redis_client is None: | ||
| LOGGER.info("Can not create invalidation cache engine: service registry did not return a Redis client") |
There was a problem hiding this comment.
Log message grammar: "Can not create" → "Cannot create".
| LOGGER.info("Can not create invalidation cache engine: service registry did not return a Redis client") | |
| LOGGER.info("Cannot create invalidation cache engine: service registry did not return a Redis client") |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 28 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| result_schema.fields.append(field) | ||
| result_schema.reload_caches() | ||
|
|
||
| try: |
There was a problem hiding this comment.
result_schema.fields.append(field) and result_schema.reload_caches() happen before the try/finally cleanup. If reload_caches() raises, the temporary invalidation field will remain in dataset.result_schema for the rest of the request (and potentially leak into subsequent operations on the same dataset object). Wrap the append/reload in the existing try/finally (or add an outer try/finally) so cleanup always runs.
| result_schema.fields.append(field) | |
| result_schema.reload_caches() | |
| try: | |
| try: | |
| result_schema.fields.append(field) | |
| result_schema.reload_caches() |
| class RedisPydanticSettings(dl_settings.BaseSettings): | ||
| MODE: RedisMode | ||
| CLUSTER_NAME: str = "" | ||
| HOSTS: Annotated[ | ||
| tuple[str, ...], | ||
| dl_settings.split_validator(","), | ||
| ] = () | ||
| PORT: int = 6379 | ||
| DB: int = 0 | ||
| PASSWORD: str = pydantic.Field(repr=False) | ||
| SSL: bool | None = None | ||
| SOCKET_TIMEOUT: float = 0.0 | ||
| SOCKET_CONNECT_TIMEOUT: float = 0.0 | ||
|
|
||
| def as_single_host_url(self) -> str: | ||
| return make_url( | ||
| protocol="rediss" if self.SSL else "redis", | ||
| host=self.HOSTS[0], | ||
| port=self.PORT, | ||
| path=str(self.DB), | ||
| ) |
There was a problem hiding this comment.
RedisPydanticSettings.as_single_host_url() indexes self.HOSTS[0], but HOSTS defaults to an empty tuple. With MODE=single_host and missing/empty HOSTS, this will raise IndexError during app startup. Consider adding a pydantic validator to require at least one host (and optionally enforce host requirements per MODE) or make HOSTS required/no default.
| async def execute_query( | ||
| self, | ||
| block_spec: BlockSpec, | ||
| possible_data_lengths: Optional[Collection] = None, | ||
| profiling_postfix: str = "", | ||
| parameter_value_specs: list[ParameterValueSpec] | None = None, | ||
| allow_cache_usage: bool | None = None, | ||
| cache_invalidation_payload: str | None = None, | ||
| ) -> PostprocessedQuery: |
There was a problem hiding this comment.
The newly added allow_cache_usage / cache_invalidation_payload parameters are currently unused in this override. In the DB fallback path the method calls super().execute_query(...) without forwarding them, so cache overrides are ignored and the invalidation payload computed in execute_all_queries(..., skip_invalidation_check=False) won’t actually affect the cache key for range requests. Please pass these through to super().execute_query (and consider passing parameter_value_specs as well for consistency).
| @@ -105,6 +108,7 @@ async def execute_query( | |||
| block_spec=block_spec, | |||
| profiling_postfix=profiling_postfix, | |||
| parameter_value_specs=parameter_value_specs, | |||
There was a problem hiding this comment.
allow_cache_usage is accepted by this override but isn’t forwarded to super().execute_query(...), so any caller-provided cache override is ignored (including internal calls that might want to force-disable caching). Please pass allow_cache_usage=allow_cache_usage through alongside the existing cache_invalidation_payload forwarding.
| parameter_value_specs=parameter_value_specs, | |
| parameter_value_specs=parameter_value_specs, | |
| allow_cache_usage=allow_cache_usage, |
No description provided.