Skip to content

Commit 5bf8143

Browse files
committed
feat: enable remote pilot logging system
1 parent 4ddba9d commit 5bf8143

File tree

16 files changed

+424
-22
lines changed

16 files changed

+424
-22
lines changed

diracx-core/src/diracx/core/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ def __init__(self, job_id: int, detail: str | None = None):
4646
super().__init__(f"Job {job_id} not found" + (" ({detail})" if detail else ""))
4747

4848

49+
class PilotNotFoundError(Exception):
50+
def __init__(self, pilot_stamp: str, detail: str | None = None):
51+
self.pilot_stamp: str = pilot_stamp
52+
super().__init__(
53+
f"Pilot (stamp) {pilot_stamp} not found" + (" ({detail})" if detail else "")
54+
)
55+
56+
4957
class SandboxNotFoundError(Exception):
5058
def __init__(self, pfn: str, se_name: str, detail: str | None = None):
5159
self.pfn: str = pfn

diracx-db/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ TaskQueueDB = "diracx.db.sql:TaskQueueDB"
3737

3838
[project.entry-points."diracx.db.os"]
3939
JobParametersDB = "diracx.db.os:JobParametersDB"
40+
PilotLogsDB = "diracx.db.os:PilotLogsDB"
4041

4142
[tool.setuptools.packages.find]
4243
where = ["src"]
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from __future__ import annotations
22

3-
__all__ = ("JobParametersDB",)
3+
__all__ = (
4+
"JobParametersDB",
5+
"PilotLogsDB",
6+
)
47

58
from .job_parameters import JobParametersDB
9+
from .pilot_logs import PilotLogsDB
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from __future__ import annotations
2+
3+
from diracx.db.os.utils import BaseOSDB
4+
5+
6+
class PilotLogsDB(BaseOSDB):
7+
fields = {
8+
"PilotStamp": {"type": "keyword"},
9+
"PilotID": {"type": "long"},
10+
"SubmissionTime": {"type": "date"},
11+
"LineNumber": {"type": "long"},
12+
"Message": {"type": "text"},
13+
"VO": {"type": "keyword"},
14+
"timestamp": {"type": "date"},
15+
}
16+
index_prefix = "pilot_logs"
17+
18+
def index_name(self, doc_id: int) -> str:
19+
# TODO decide how to define the index name
20+
# use pilot ID
21+
return f"{self.index_prefix}_{doc_id // 1e6:.0f}"

diracx-db/src/diracx/db/os/utils.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from typing import Any, Self
1414

1515
from opensearchpy import AsyncOpenSearch
16+
from opensearchpy.helpers import async_bulk
1617

1718
from diracx.core.exceptions import InvalidQueryError
1819
from diracx.core.extensions import select_from_extension
@@ -190,6 +191,13 @@ async def upsert(self, doc_id, document) -> None:
190191
)
191192
print(f"{response=}")
192193

194+
async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
195+
"""Bulk inserting to database."""
196+
n_inserted = await async_bulk(
197+
self.client, actions=[doc | {"_index": index_name} for doc in docs]
198+
)
199+
logger.info("Inserted %d documents to %r", n_inserted, index_name)
200+
193201
async def search(
194202
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
195203
) -> list[dict[str, Any]]:
@@ -231,6 +239,17 @@ async def search(
231239

232240
return hits
233241

242+
async def delete(self, query: list[dict[str, Any]]) -> dict:
243+
"""Delete multiple documents by query."""
244+
body = {}
245+
res = {}
246+
if query:
247+
body["query"] = apply_search_filters(self.fields, query)
248+
res = await self.client.delete_by_query(
249+
body=body, index=f"{self.index_prefix}*"
250+
)
251+
return res
252+
234253

235254
def require_type(operator, field_name, field_type, allowed_types):
236255
if field_type not in allowed_types:

diracx-db/src/diracx/db/sql/job/db.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
SortSpec,
1717
)
1818

19-
from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints
19+
from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints, get_columns
2020
from .schema import (
2121
InputData,
2222
JobCommands,
@@ -26,17 +26,6 @@
2626
)
2727

2828

29-
def _get_columns(table, parameters):
30-
columns = [x for x in table.columns]
31-
if parameters:
32-
if unrecognised_parameters := set(parameters) - set(table.columns.keys()):
33-
raise InvalidQueryError(
34-
f"Unrecognised parameters requested {unrecognised_parameters}"
35-
)
36-
columns = [c for c in columns if c.name in parameters]
37-
return columns
38-
39-
4029
class JobDB(BaseSQLDB):
4130
metadata = JobDBBase.metadata
4231

@@ -46,7 +35,7 @@ class JobDB(BaseSQLDB):
4635
jdl_2_db_parameters = ["JobName", "JobType", "JobGroup"]
4736

4837
async def summary(self, group_by, search) -> list[dict[str, str | int]]:
49-
columns = _get_columns(Jobs.__table__, group_by)
38+
columns = get_columns(Jobs.__table__, group_by)
5039

5140
stmt = select(*columns, func.count(Jobs.job_id).label("count"))
5241
stmt = apply_search_filters(Jobs.__table__.columns.__getitem__, stmt, search)
@@ -70,7 +59,7 @@ async def search(
7059
page: int | None = None,
7160
) -> tuple[int, list[dict[Any, Any]]]:
7261
# Find which columns to select
73-
columns = _get_columns(Jobs.__table__, parameters)
62+
columns = get_columns(Jobs.__table__, parameters)
7463

7564
stmt = select(*columns)
7665

@@ -328,7 +317,7 @@ async def set_properties(
328317
required_parameters = list(required_parameters_set)[0]
329318
update_parameters = [{"job_id": k, **v} for k, v in properties.items()]
330319

331-
columns = _get_columns(Jobs.__table__, required_parameters)
320+
columns = get_columns(Jobs.__table__, required_parameters)
332321
values: dict[str, BindParameter[Any] | datetime] = {
333322
c.name: bindparam(c.name) for c in columns
334323
}

diracx-db/src/diracx/db/sql/pilot_agents/db.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
from __future__ import annotations
22

33
from datetime import datetime, timezone
4+
from typing import Any
45

5-
from sqlalchemy import insert
6+
from sqlalchemy import func, insert, select
67

7-
from ..utils import BaseSQLDB
8+
from diracx.core.exceptions import InvalidQueryError
9+
from diracx.core.models import (
10+
SearchSpec,
11+
SortSpec,
12+
)
13+
14+
from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints, get_columns
815
from .schema import PilotAgents, PilotAgentsDBBase
916

1017

@@ -44,3 +51,46 @@ async def add_pilot_references(
4451
stmt = insert(PilotAgents).values(values)
4552
await self.conn.execute(stmt)
4653
return
54+
55+
async def search(
56+
self,
57+
parameters: list[str] | None,
58+
search: list[SearchSpec],
59+
sorts: list[SortSpec],
60+
*,
61+
distinct: bool = False,
62+
per_page: int = 100,
63+
page: int | None = None,
64+
) -> tuple[int, list[dict[Any, Any]]]:
65+
# Find which columns to select
66+
columns = get_columns(PilotAgents.__table__, parameters)
67+
68+
stmt = select(*columns)
69+
70+
stmt = apply_search_filters(
71+
PilotAgents.__table__.columns.__getitem__, stmt, search
72+
)
73+
stmt = apply_sort_constraints(
74+
PilotAgents.__table__.columns.__getitem__, stmt, sorts
75+
)
76+
77+
if distinct:
78+
stmt = stmt.distinct()
79+
80+
# Calculate total count before applying pagination
81+
total_count_subquery = stmt.alias()
82+
total_count_stmt = select(func.count()).select_from(total_count_subquery)
83+
total = (await self.conn.execute(total_count_stmt)).scalar_one()
84+
85+
# Apply pagination
86+
if page is not None:
87+
if page < 1:
88+
raise InvalidQueryError("Page must be a positive integer")
89+
if per_page < 1:
90+
raise InvalidQueryError("Per page must be a positive integer")
91+
stmt = stmt.offset((page - 1) * per_page).limit(per_page)
92+
93+
# Execute the query
94+
return total, [
95+
dict(row._mapping) async for row in (await self.conn.stream(stmt))
96+
]

diracx-db/src/diracx/db/sql/utils/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
SQLDBUnavailableError,
66
apply_search_filters,
77
apply_sort_constraints,
8+
get_columns,
89
)
910
from .functions import substract_date, utcnow
1011
from .types import Column, DateNowColumn, EnumBackedBool, EnumColumn, NullColumn
@@ -19,6 +20,7 @@
1920
"EnumColumn",
2021
"apply_search_filters",
2122
"apply_sort_constraints",
23+
"get_columns",
2224
"substract_date",
2325
"SQLDBUnavailableError",
2426
)

diracx-db/src/diracx/db/sql/utils/base.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,17 @@ def find_time_resolution(value):
258258
raise InvalidQueryError(f"Cannot parse {value=}")
259259

260260

261+
def get_columns(table, parameters):
262+
columns = [x for x in table.columns]
263+
if parameters:
264+
if unrecognised_parameters := set(parameters) - set(table.columns.keys()):
265+
raise InvalidQueryError(
266+
f"Unrecognised parameters requested {unrecognised_parameters}"
267+
)
268+
columns = [c for c in columns if c.name in parameters]
269+
return columns
270+
271+
261272
def apply_search_filters(column_mapping, stmt, search):
262273
for query in search:
263274
try:

diracx-routers/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ types = [
4747
]
4848

4949
[project.entry-points."diracx.services"]
50+
pilots = "diracx.routers.pilots:router"
5051
jobs = "diracx.routers.jobs:router"
5152
config = "diracx.routers.configuration:router"
5253
auth = "diracx.routers.auth:router"
@@ -55,6 +56,7 @@ auth = "diracx.routers.auth:router"
5556
[project.entry-points."diracx.access_policies"]
5657
WMSAccessPolicy = "diracx.routers.jobs.access_policies:WMSAccessPolicy"
5758
SandboxAccessPolicy = "diracx.routers.jobs.access_policies:SandboxAccessPolicy"
59+
PilotLogsAccessPolicy = "diracx.routers.pilots.access_policies:PilotLogsAccessPolicy"
5860

5961
# Minimum version of the client supported
6062
[project.entry-points."diracx.min_client_version"]

0 commit comments

Comments
 (0)