Skip to content

Commit 02c24a9

Browse files
committed
feat: enable remote pilot logging system
1 parent 5790482 commit 02c24a9

9 files changed

Lines changed: 152 additions & 2 deletions

File tree

diracx-db/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ TaskQueueDB = "diracx.db.sql:TaskQueueDB"
3636

3737
[project.entry-points."diracx.db.os"]
3838
JobParametersDB = "diracx.db.os:JobParametersDB"
39+
PilotLogsDB = "diracx.db.os:PilotLogsDB"
3940

4041
[tool.setuptools.packages.find]
4142
where = ["src"]
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from __future__ import annotations
22

3-
__all__ = ("JobParametersDB",)
3+
__all__ = (
4+
"JobParametersDB",
5+
"PilotLogsDB",
6+
)
47

58
from .job_parameters import JobParametersDB
9+
from .pilot_logs import PilotLogsDB
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from __future__ import annotations
2+
3+
from diracx.db.os.utils import BaseOSDB
4+
5+
6+
class PilotLogsDB(BaseOSDB):
7+
fields = {
8+
"PilotStamp": {"type": "keyword"},
9+
"LineNumber": {"type": "long"},
10+
"Message": {"type": "text"},
11+
"VO": {"type": "keyword"},
12+
"timestamp": {"type": "date"},
13+
}
14+
index_prefix = "pilot_logs"
15+
16+
def index_name(self, doc_id: int) -> str:
17+
# TODO decide how to define the index name
18+
return f"{self.index_prefix}_0"

diracx-db/src/diracx/db/os/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import Any, AsyncIterator, Self
1313

1414
from opensearchpy import AsyncOpenSearch
15+
from opensearchpy.helpers import async_bulk
1516

1617
from diracx.core.exceptions import InvalidQueryError
1718
from diracx.core.extensions import select_from_extension
@@ -189,6 +190,13 @@ async def upsert(self, doc_id, document) -> None:
189190
)
190191
print(f"{response=}")
191192

193+
async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
194+
# bulk inserting to database
195+
n_inserted = await async_bulk(
196+
self.client, actions=[doc | {"_index": index_name} for doc in docs]
197+
)
198+
logger.info("Inserted %s documents to %s", n_inserted, index_name)
199+
192200
async def search(
193201
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
194202
) -> list[dict[str, Any]]:

diracx-routers/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ types = [
4747
]
4848

4949
[project.entry-points."diracx.services"]
50+
pilotlogs = "diracx.routers.pilot_logging.remote_logger:router"
5051
jobs = "diracx.routers.job_manager:router"
5152
config = "diracx.routers.configuration:router"
5253
auth = "diracx.routers.auth:router"
@@ -55,6 +56,7 @@ auth = "diracx.routers.auth:router"
5556
[project.entry-points."diracx.access_policies"]
5657
WMSAccessPolicy = "diracx.routers.job_manager.access_policies:WMSAccessPolicy"
5758
SandboxAccessPolicy = "diracx.routers.job_manager.access_policies:SandboxAccessPolicy"
59+
PilotLogsAccessPolicy = "diracx.routers.pilot_logging.access_policies:PilotLogsAccessPolicy"
5860

5961

6062
[tool.setuptools.packages.find]

diracx-routers/src/diracx/routers/dependencies.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
"JobLoggingDB",
88
"SandboxMetadataDB",
99
"TaskQueueDB",
10+
"JobParametersDB",
11+
"PilotLogsDB",
1012
"add_settings_annotation",
1113
"AvailableSecurityProperties",
1214
)
@@ -18,6 +20,8 @@
1820
from diracx.core.config import Config as _Config
1921
from diracx.core.config import ConfigSource
2022
from diracx.core.properties import SecurityProperty
23+
from diracx.db.os import JobParametersDB as _JobParametersDB
24+
from diracx.db.os import PilotLogsDB as _PilotLogsDB
2125
from diracx.db.sql import AuthDB as _AuthDB
2226
from diracx.db.sql import JobDB as _JobDB
2327
from diracx.db.sql import JobLoggingDB as _JobLoggingDB
@@ -32,7 +36,7 @@ def add_settings_annotation(cls: T) -> T:
3236
return Annotated[cls, Depends(cls.create)] # type: ignore
3337

3438

35-
# Databases
39+
# SQL Databases
3640
AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)]
3741
JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)]
3842
JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)]
@@ -41,6 +45,10 @@ def add_settings_annotation(cls: T) -> T:
4145
]
4246
TaskQueueDB = Annotated[_TaskQueueDB, Depends(_TaskQueueDB.transaction)]
4347

48+
# OpenSearch Databases
49+
JobParametersDB = Annotated[_JobParametersDB, Depends(_JobParametersDB.session)]
50+
PilotLogsDB = Annotated[_PilotLogsDB, Depends(_PilotLogsDB.session)]
51+
4452
# Miscellaneous
4553
Config = Annotated[_Config, Depends(ConfigSource.create)]
4654
AvailableSecurityProperties = Annotated[
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
5+
logger = logging.getLogger(__name__)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from __future__ import annotations
2+
3+
from enum import StrEnum, auto
4+
from typing import Annotated, Callable
5+
6+
from fastapi import Depends, HTTPException, status
7+
8+
from diracx.core.properties import GENERIC_PILOT, OPERATOR, PILOT, SERVICE_ADMINISTRATOR
9+
from diracx.db.os import PilotLogsDB
10+
from diracx.routers.access_policies import BaseAccessPolicy
11+
12+
from ..utils.users import AuthorizedUserInfo
13+
14+
15+
class ActionType(StrEnum):
16+
#: Create/update pilot log records
17+
CREATE = auto()
18+
#: download pilot logs
19+
READ = auto()
20+
#: delete pilot logs
21+
DELETE = auto()
22+
#: Search
23+
QUERY = auto()
24+
25+
26+
class PilotLogsAccessPolicy(BaseAccessPolicy):
27+
"""ToDo
28+
----
29+
30+
"""
31+
32+
@staticmethod
33+
async def policy(
34+
policy_name: str,
35+
user_info: AuthorizedUserInfo,
36+
/,
37+
*,
38+
action: ActionType | None = None,
39+
pilot_db: PilotLogsDB | None = None,
40+
pilot_ids: list[int] | None = None, # or pilot stamp list ?
41+
):
42+
print("user_info.properties:", user_info.properties)
43+
assert action, "action is a mandatory parameter"
44+
assert pilot_db, "pilot_db is a mandatory parameter"
45+
46+
if GENERIC_PILOT in user_info.properties:
47+
return
48+
if PILOT in user_info.properties:
49+
return
50+
if SERVICE_ADMINISTRATOR in user_info.properties:
51+
return
52+
if OPERATOR in user_info.properties:
53+
return
54+
55+
raise HTTPException(status.HTTP_403_FORBIDDEN, detail=user_info.properties)
56+
57+
58+
CheckPilotLogsPolicyCallable = Annotated[Callable, Depends(PilotLogsAccessPolicy.check)]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel
4+
5+
from ..dependencies import PilotLogsDB
6+
from ..fastapi_classes import DiracxRouter
7+
from ..pilot_logging import logger
8+
from .access_policies import ActionType, CheckPilotLogsPolicyCallable
9+
10+
router = DiracxRouter()
11+
12+
13+
class LogLine(BaseModel):
14+
line_no: int
15+
line: str
16+
17+
18+
class LogMessage(BaseModel):
19+
pilot_stamp: str
20+
lines: list[LogLine]
21+
vo: str
22+
23+
24+
@router.post("/")
25+
async def send_message(
26+
data: LogMessage,
27+
pilot_logs_db: PilotLogsDB,
28+
check_permissions: CheckPilotLogsPolicyCallable,
29+
):
30+
logger.warning(f"Message received '{data}'")
31+
await check_permissions(action=ActionType.CREATE, pilot_db=pilot_logs_db)
32+
33+
pilot_id = 1234 # need to get pilot id from pilot_stamp (via pilot DB)
34+
35+
docs = []
36+
for line in data.lines:
37+
docs.append(
38+
{
39+
"PilotStamp": data.pilot_stamp,
40+
"VO": data.vo,
41+
"LineNumber": line.line_no,
42+
"Message": line.line,
43+
}
44+
)
45+
await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(pilot_id), docs)
46+
return data

0 commit comments

Comments
 (0)