Skip to content

Commit d214f90

Browse files
chrisburrLoxeris
andauthored
feat: add rss router serving cached resource status (#940)
* feat: add rss router serving cached resource status Adds the concrete resource status sources (storage, compute, FTS, site) backed by ResourceStatusDB and serves them through new /api/rss/{storage,compute,site,fts} endpoints. Responses are read from an all-VO snapshot cached per source, filtered to the caller's VO, and carry ETag/Last-Modified headers with 304 support; any authenticated user may read. Includes the regenerated diracx and gubbins clients. Closes #839 Co-authored-by: Loxeris <30194187+Loxeris@users.noreply.github.com> * fix(rss): filter revision query by element_type Compute and FTS sources share the all status type, so latest_revision() computed identical revisions for both and counted rows from the other element type. Pass element_type to get_resource_status_date() so the revision tracks exactly the rows read_raw returns. --------- Co-authored-by: Loxeris <30194187+Loxeris@users.noreply.github.com>
1 parent 1f85ab9 commit d214f90

24 files changed

Lines changed: 2807 additions & 5 deletions

File tree

diracx-client/src/diracx/client/_generated/_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from . import models as _models
1616
from ._configuration import DiracConfiguration
1717
from ._utils.serialization import Deserializer, Serializer
18-
from .operations import AuthOperations, ConfigOperations, JobsOperations, WellKnownOperations
18+
from .operations import AuthOperations, ConfigOperations, JobsOperations, RssOperations, WellKnownOperations
1919

2020

2121
class Dirac: # pylint: disable=client-accepts-api-version-keyword
@@ -29,6 +29,8 @@ class Dirac: # pylint: disable=client-accepts-api-version-keyword
2929
:vartype config: _generated.operations.ConfigOperations
3030
:ivar jobs: JobsOperations operations
3131
:vartype jobs: _generated.operations.JobsOperations
32+
:ivar rss: RssOperations operations
33+
:vartype rss: _generated.operations.RssOperations
3234
:keyword endpoint: Service URL. Required. Default value is "".
3335
:paramtype endpoint: str
3436
"""
@@ -65,6 +67,7 @@ def __init__( # pylint: disable=missing-client-constructor-parameter-credential
6567
self.auth = AuthOperations(self._client, self._config, self._serialize, self._deserialize)
6668
self.config = ConfigOperations(self._client, self._config, self._serialize, self._deserialize)
6769
self.jobs = JobsOperations(self._client, self._config, self._serialize, self._deserialize)
70+
self.rss = RssOperations(self._client, self._config, self._serialize, self._deserialize)
6871

6972
def send_request(self, request: HttpRequest, *, stream: bool = False, **kwargs: Any) -> HttpResponse:
7073
"""Runs the network request through the client's chained policies.

diracx-client/src/diracx/client/_generated/aio/_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from .. import models as _models
1616
from .._utils.serialization import Deserializer, Serializer
1717
from ._configuration import DiracConfiguration
18-
from .operations import AuthOperations, ConfigOperations, JobsOperations, WellKnownOperations
18+
from .operations import AuthOperations, ConfigOperations, JobsOperations, RssOperations, WellKnownOperations
1919

2020

2121
class Dirac: # pylint: disable=client-accepts-api-version-keyword
@@ -29,6 +29,8 @@ class Dirac: # pylint: disable=client-accepts-api-version-keyword
2929
:vartype config: _generated.aio.operations.ConfigOperations
3030
:ivar jobs: JobsOperations operations
3131
:vartype jobs: _generated.aio.operations.JobsOperations
32+
:ivar rss: RssOperations operations
33+
:vartype rss: _generated.aio.operations.RssOperations
3234
:keyword endpoint: Service URL. Required. Default value is "".
3335
:paramtype endpoint: str
3436
"""
@@ -65,6 +67,7 @@ def __init__( # pylint: disable=missing-client-constructor-parameter-credential
6567
self.auth = AuthOperations(self._client, self._config, self._serialize, self._deserialize)
6668
self.config = ConfigOperations(self._client, self._config, self._serialize, self._deserialize)
6769
self.jobs = JobsOperations(self._client, self._config, self._serialize, self._deserialize)
70+
self.rss = RssOperations(self._client, self._config, self._serialize, self._deserialize)
6871

6972
def send_request(
7073
self, request: HttpRequest, *, stream: bool = False, **kwargs: Any

diracx-client/src/diracx/client/_generated/aio/operations/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from ._operations import AuthOperations # type: ignore
1515
from ._operations import ConfigOperations # type: ignore
1616
from ._operations import JobsOperations # type: ignore
17+
from ._operations import RssOperations # type: ignore
1718

1819
from ._patch import __all__ as _patch_all
1920
from ._patch import *
@@ -24,6 +25,7 @@
2425
"AuthOperations",
2526
"ConfigOperations",
2627
"JobsOperations",
28+
"RssOperations",
2729
]
2830
__all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore
2931
_patch_sdk()

diracx-client/src/diracx/client/_generated/aio/operations/_operations.py

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
build_jobs_summary_request,
5252
build_jobs_unassign_bulk_jobs_sandboxes_request,
5353
build_jobs_unassign_job_sandboxes_request,
54+
build_rss_get_compute_status_request,
55+
build_rss_get_fts_status_request,
56+
build_rss_get_site_status_request,
57+
build_rss_get_storage_status_request,
5458
build_well_known_get_installation_metadata_request,
5559
build_well_known_get_jwks_request,
5660
build_well_known_get_openid_configuration_request,
@@ -2319,3 +2323,303 @@ async def submit_jdl_jobs(self, body: Union[list[str], IO[bytes]], **kwargs: Any
23192323
return cls(pipeline_response, deserialized, {}) # type: ignore
23202324

23212325
return deserialized # type: ignore
2326+
2327+
2328+
class RssOperations:
2329+
"""
2330+
.. warning::
2331+
**DO NOT** instantiate this class directly.
2332+
2333+
Instead, you should access the following operations through
2334+
:class:`~_generated.aio.Dirac`'s
2335+
:attr:`rss` attribute.
2336+
"""
2337+
2338+
models = _models
2339+
2340+
def __init__(self, *args, **kwargs) -> None:
2341+
input_args = list(args)
2342+
self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
2343+
self._config: DiracConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
2344+
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
2345+
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
2346+
2347+
@distributed_trace_async
2348+
async def get_storage_status(
2349+
self,
2350+
*,
2351+
if_modified_since: Optional[str] = None,
2352+
etag: Optional[str] = None,
2353+
match_condition: Optional[MatchConditions] = None,
2354+
**kwargs: Any
2355+
) -> dict[str, _models.StorageElementStatus]:
2356+
"""Get Storage Status.
2357+
2358+
Get the latest status of storage elements, scoped to the caller's VO.
2359+
2360+
:keyword if_modified_since: Default value is None.
2361+
:paramtype if_modified_since: str
2362+
:keyword etag: check if resource is changed. Set None to skip checking etag. Default value is
2363+
None.
2364+
:paramtype etag: str
2365+
:keyword match_condition: The match condition to use upon the etag. Default value is None.
2366+
:paramtype match_condition: ~azure.core.MatchConditions
2367+
:return: dict mapping str to StorageElementStatus
2368+
:rtype: dict[str, ~_generated.models.StorageElementStatus]
2369+
:raises ~azure.core.exceptions.HttpResponseError:
2370+
"""
2371+
error_map: MutableMapping = {
2372+
401: ClientAuthenticationError,
2373+
404: ResourceNotFoundError,
2374+
409: ResourceExistsError,
2375+
304: ResourceNotModifiedError,
2376+
}
2377+
if match_condition == MatchConditions.IfNotModified:
2378+
error_map[412] = ResourceModifiedError
2379+
elif match_condition == MatchConditions.IfPresent:
2380+
error_map[412] = ResourceNotFoundError
2381+
elif match_condition == MatchConditions.IfMissing:
2382+
error_map[412] = ResourceExistsError
2383+
error_map.update(kwargs.pop("error_map", {}) or {})
2384+
2385+
_headers = kwargs.pop("headers", {}) or {}
2386+
_params = kwargs.pop("params", {}) or {}
2387+
2388+
cls: ClsType[dict[str, _models.StorageElementStatus]] = kwargs.pop("cls", None)
2389+
2390+
_request = build_rss_get_storage_status_request(
2391+
if_modified_since=if_modified_since,
2392+
etag=etag,
2393+
match_condition=match_condition,
2394+
headers=_headers,
2395+
params=_params,
2396+
)
2397+
_request.url = self._client.format_url(_request.url)
2398+
2399+
_stream = False
2400+
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
2401+
_request, stream=_stream, **kwargs
2402+
)
2403+
2404+
response = pipeline_response.http_response
2405+
2406+
if response.status_code not in [200]:
2407+
map_error(status_code=response.status_code, response=response, error_map=error_map)
2408+
raise HttpResponseError(response=response)
2409+
2410+
deserialized = self._deserialize("{StorageElementStatus}", pipeline_response.http_response)
2411+
2412+
if cls:
2413+
return cls(pipeline_response, deserialized, {}) # type: ignore
2414+
2415+
return deserialized # type: ignore
2416+
2417+
@distributed_trace_async
2418+
async def get_compute_status(
2419+
self,
2420+
*,
2421+
if_modified_since: Optional[str] = None,
2422+
etag: Optional[str] = None,
2423+
match_condition: Optional[MatchConditions] = None,
2424+
**kwargs: Any
2425+
) -> dict[str, _models.ComputeElementStatus]:
2426+
"""Get Compute Status.
2427+
2428+
Get the latest status of compute elements, scoped to the caller's VO.
2429+
2430+
:keyword if_modified_since: Default value is None.
2431+
:paramtype if_modified_since: str
2432+
:keyword etag: check if resource is changed. Set None to skip checking etag. Default value is
2433+
None.
2434+
:paramtype etag: str
2435+
:keyword match_condition: The match condition to use upon the etag. Default value is None.
2436+
:paramtype match_condition: ~azure.core.MatchConditions
2437+
:return: dict mapping str to ComputeElementStatus
2438+
:rtype: dict[str, ~_generated.models.ComputeElementStatus]
2439+
:raises ~azure.core.exceptions.HttpResponseError:
2440+
"""
2441+
error_map: MutableMapping = {
2442+
401: ClientAuthenticationError,
2443+
404: ResourceNotFoundError,
2444+
409: ResourceExistsError,
2445+
304: ResourceNotModifiedError,
2446+
}
2447+
if match_condition == MatchConditions.IfNotModified:
2448+
error_map[412] = ResourceModifiedError
2449+
elif match_condition == MatchConditions.IfPresent:
2450+
error_map[412] = ResourceNotFoundError
2451+
elif match_condition == MatchConditions.IfMissing:
2452+
error_map[412] = ResourceExistsError
2453+
error_map.update(kwargs.pop("error_map", {}) or {})
2454+
2455+
_headers = kwargs.pop("headers", {}) or {}
2456+
_params = kwargs.pop("params", {}) or {}
2457+
2458+
cls: ClsType[dict[str, _models.ComputeElementStatus]] = kwargs.pop("cls", None)
2459+
2460+
_request = build_rss_get_compute_status_request(
2461+
if_modified_since=if_modified_since,
2462+
etag=etag,
2463+
match_condition=match_condition,
2464+
headers=_headers,
2465+
params=_params,
2466+
)
2467+
_request.url = self._client.format_url(_request.url)
2468+
2469+
_stream = False
2470+
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
2471+
_request, stream=_stream, **kwargs
2472+
)
2473+
2474+
response = pipeline_response.http_response
2475+
2476+
if response.status_code not in [200]:
2477+
map_error(status_code=response.status_code, response=response, error_map=error_map)
2478+
raise HttpResponseError(response=response)
2479+
2480+
deserialized = self._deserialize("{ComputeElementStatus}", pipeline_response.http_response)
2481+
2482+
if cls:
2483+
return cls(pipeline_response, deserialized, {}) # type: ignore
2484+
2485+
return deserialized # type: ignore
2486+
2487+
@distributed_trace_async
2488+
async def get_site_status(
2489+
self,
2490+
*,
2491+
if_modified_since: Optional[str] = None,
2492+
etag: Optional[str] = None,
2493+
match_condition: Optional[MatchConditions] = None,
2494+
**kwargs: Any
2495+
) -> dict[str, _models.SiteStatus]:
2496+
"""Get Site Status.
2497+
2498+
Get the latest status of sites, scoped to the caller's VO.
2499+
2500+
:keyword if_modified_since: Default value is None.
2501+
:paramtype if_modified_since: str
2502+
:keyword etag: check if resource is changed. Set None to skip checking etag. Default value is
2503+
None.
2504+
:paramtype etag: str
2505+
:keyword match_condition: The match condition to use upon the etag. Default value is None.
2506+
:paramtype match_condition: ~azure.core.MatchConditions
2507+
:return: dict mapping str to SiteStatus
2508+
:rtype: dict[str, ~_generated.models.SiteStatus]
2509+
:raises ~azure.core.exceptions.HttpResponseError:
2510+
"""
2511+
error_map: MutableMapping = {
2512+
401: ClientAuthenticationError,
2513+
404: ResourceNotFoundError,
2514+
409: ResourceExistsError,
2515+
304: ResourceNotModifiedError,
2516+
}
2517+
if match_condition == MatchConditions.IfNotModified:
2518+
error_map[412] = ResourceModifiedError
2519+
elif match_condition == MatchConditions.IfPresent:
2520+
error_map[412] = ResourceNotFoundError
2521+
elif match_condition == MatchConditions.IfMissing:
2522+
error_map[412] = ResourceExistsError
2523+
error_map.update(kwargs.pop("error_map", {}) or {})
2524+
2525+
_headers = kwargs.pop("headers", {}) or {}
2526+
_params = kwargs.pop("params", {}) or {}
2527+
2528+
cls: ClsType[dict[str, _models.SiteStatus]] = kwargs.pop("cls", None)
2529+
2530+
_request = build_rss_get_site_status_request(
2531+
if_modified_since=if_modified_since,
2532+
etag=etag,
2533+
match_condition=match_condition,
2534+
headers=_headers,
2535+
params=_params,
2536+
)
2537+
_request.url = self._client.format_url(_request.url)
2538+
2539+
_stream = False
2540+
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
2541+
_request, stream=_stream, **kwargs
2542+
)
2543+
2544+
response = pipeline_response.http_response
2545+
2546+
if response.status_code not in [200]:
2547+
map_error(status_code=response.status_code, response=response, error_map=error_map)
2548+
raise HttpResponseError(response=response)
2549+
2550+
deserialized = self._deserialize("{SiteStatus}", pipeline_response.http_response)
2551+
2552+
if cls:
2553+
return cls(pipeline_response, deserialized, {}) # type: ignore
2554+
2555+
return deserialized # type: ignore
2556+
2557+
@distributed_trace_async
2558+
async def get_fts_status(
2559+
self,
2560+
*,
2561+
if_modified_since: Optional[str] = None,
2562+
etag: Optional[str] = None,
2563+
match_condition: Optional[MatchConditions] = None,
2564+
**kwargs: Any
2565+
) -> dict[str, _models.FTSStatus]:
2566+
"""Get Fts Status.
2567+
2568+
Get the latest status of FTS servers, scoped to the caller's VO.
2569+
2570+
:keyword if_modified_since: Default value is None.
2571+
:paramtype if_modified_since: str
2572+
:keyword etag: check if resource is changed. Set None to skip checking etag. Default value is
2573+
None.
2574+
:paramtype etag: str
2575+
:keyword match_condition: The match condition to use upon the etag. Default value is None.
2576+
:paramtype match_condition: ~azure.core.MatchConditions
2577+
:return: dict mapping str to FTSStatus
2578+
:rtype: dict[str, ~_generated.models.FTSStatus]
2579+
:raises ~azure.core.exceptions.HttpResponseError:
2580+
"""
2581+
error_map: MutableMapping = {
2582+
401: ClientAuthenticationError,
2583+
404: ResourceNotFoundError,
2584+
409: ResourceExistsError,
2585+
304: ResourceNotModifiedError,
2586+
}
2587+
if match_condition == MatchConditions.IfNotModified:
2588+
error_map[412] = ResourceModifiedError
2589+
elif match_condition == MatchConditions.IfPresent:
2590+
error_map[412] = ResourceNotFoundError
2591+
elif match_condition == MatchConditions.IfMissing:
2592+
error_map[412] = ResourceExistsError
2593+
error_map.update(kwargs.pop("error_map", {}) or {})
2594+
2595+
_headers = kwargs.pop("headers", {}) or {}
2596+
_params = kwargs.pop("params", {}) or {}
2597+
2598+
cls: ClsType[dict[str, _models.FTSStatus]] = kwargs.pop("cls", None)
2599+
2600+
_request = build_rss_get_fts_status_request(
2601+
if_modified_since=if_modified_since,
2602+
etag=etag,
2603+
match_condition=match_condition,
2604+
headers=_headers,
2605+
params=_params,
2606+
)
2607+
_request.url = self._client.format_url(_request.url)
2608+
2609+
_stream = False
2610+
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
2611+
_request, stream=_stream, **kwargs
2612+
)
2613+
2614+
response = pipeline_response.http_response
2615+
2616+
if response.status_code not in [200]:
2617+
map_error(status_code=response.status_code, response=response, error_map=error_map)
2618+
raise HttpResponseError(response=response)
2619+
2620+
deserialized = self._deserialize("{FTSStatus}", pipeline_response.http_response)
2621+
2622+
if cls:
2623+
return cls(pipeline_response, deserialized, {}) # type: ignore
2624+
2625+
return deserialized # type: ignore

0 commit comments

Comments
 (0)