Skip to content

Commit 03c4189

Browse files
authored
feat: jwt fetching and refreshing (#44)
* Added SubmissionAPISession class implemented as a context manager to dynamically manage token refreshing * Added unit testing file for SubmissionAPISession, added tests for login and refresh * Removed global credentials from submission_api_session file * Updated session request method to allow for token requests in body as well as header * Added tests for request method - successful first time and retries upon encoutering a 401 * Added unit tests for logout method and context manager functionality * Fixed _set_command method in AnalyticsTES to use correct cli options for SQL runner. Added entrypoint to examples/analysis_examples.py * Updated minio_client to use submission_api_session as opposed to a static token string * Updated submit_task method in BaseTesClient so that token_session is injected and used to make the post request * Updated AnalysisOrchestrator to accept SubmissionAPISession rather than static token and have this object injected accordingly in its methods * Added skeleton integration and e2e tests for SubmissionAPIToken * Added submission api credentials to the .env.example and have these initialised as defaults in the SubmissionAPISession constructor * Added integration tests for SubmissionAPISession testing that login and refresh is successful on deployed 5STES version * Added integration tests for minio client * Update AnalysisRunner:run_analysis method to use SubmissionAPISession as a context manager * Updated docstrings for SubmissionAPISession, added additional minio integration test, updated analysis runner to use context managed token session * Updated pytest.ini to exclude integration tests by default, and tidied up integration tests * Moved the error code parsing for invalid token from MinioClient._exchange_token_for_credentials to SubmisionAPISession.request to fix bug with access_token refreshing - updated tests accordingly. * Added urlparse to minioclient * Small refactor of SubmissionAPISession.request to include request sending and error handling is separate methods * Use SubmissionAPIToken in metadata_runner responsible for running bunny on 5STES * Reset analytics_tes.py to version on main for compatability with python container * Modified constructor of SubmissionAPIToken to use a base keycloak url and build the token and logout endpoints from this using the standard keycloak endpoint patterns * Updated example .env
1 parent f4d7288 commit 03c4189

12 files changed

Lines changed: 780 additions & 186 deletions

.github/workflows/release.bunny.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ name: Publish a Bunny CLI image compatible with Five Safes TES.
66
on:
77
schedule:
88
- cron: "0 0 * * *"
9-
push:
10-
branches: ["add-ci-build-for-bunny-analytics-image"]
119
env:
1210
image-name: five-safes-tes-analytics-bunny-cli
1311
repo-owner: ${{ github.repository_owner }}

analysis_orchestrator.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
1-
import json
21
import time
32
import os
43
import tes
54
from typing import List, Dict, Any, Tuple
65
from dotenv import load_dotenv
6+
77
from tes_client import BaseTESClient
8-
from tes_client import get_status_description
98
from minio_client import MinIOClient
109
import polling
11-
from string import Template
10+
from submission_api_session import SubmissionAPISession
11+
1212

13-
# Load environment variables from .env file
1413
load_dotenv()
1514

1615
class AnalysisOrchestrator:
@@ -20,7 +19,12 @@ class AnalysisOrchestrator:
2019
Analysis-specific logic is handled by the AnalysisRunner class.
2120
"""
2221

23-
def __init__(self, tes_client: BaseTESClient, token: str = None, project: str = None):
22+
def __init__(
23+
self,
24+
tes_client: BaseTESClient,
25+
token_session: SubmissionAPISession,
26+
project: str = None
27+
):
2428
"""
2529
Initialize the analysis orchestrator.
2630
@@ -29,21 +33,15 @@ def __init__(self, tes_client: BaseTESClient, token: str = None, project: str =
2933
token (str): Authentication token for TRE-FX services
3034
project (str): Project name for TES tasks (defaults to 5STES_PROJECT env var)
3135
"""
32-
if token is None:
33-
token = os.getenv('5STES_TOKEN')
34-
if not token:
35-
raise ValueError("5STES_TOKEN environment variable is required when token parameter is not provided")
36-
self.token = token
37-
38-
# Set project from environment variable if not provided
3936
if project is None:
4037
project = os.getenv('5STES_PROJECT')
4138
if not project:
4239
raise ValueError("5STES_PROJECT environment variable is required when project parameter is not provided")
43-
40+
41+
self.token_session = token_session
4442
self.project = project
4543
self.tes_client = tes_client
46-
self.minio_client = MinIOClient(token)
44+
self.minio_client = MinIOClient(token_session=token_session)
4745

4846
def parse_tres(self, tres: str) -> List[str]:
4947
"""
@@ -112,7 +110,7 @@ def _submit_and_collect_results(self,
112110
else:
113111
print(f"Submitting task to {n_results} TREs...")
114112

115-
result = self.tes_client.submit_task(tes_message, self.token)
113+
result = self.tes_client.submit_task(tes_message, token_session=self.token_session)
116114

117115
task_id = result['id']
118116
print(f"Task ID: {task_id}")

analysis_runner.py

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@
44
from tes_client import BaseTESClient
55
from data_processor import DataProcessor
66
from statistical_analyzer import StatisticalAnalyzer
7+
from submission_api_session import SubmissionAPISession
78
import numpy as np
89
import os
910
from string import Template
1011

1112

1213
class AnalysisRunner:
13-
def __init__(self,
14-
tes_client: BaseTESClient = AnalyticsTES(),
15-
token: str = None,
16-
project: str = None):
17-
self.analysis_orchestrator = AnalysisOrchestrator(tes_client=tes_client, token=token, project=project)
18-
self.tes_client = self.analysis_orchestrator.tes_client
14+
def __init__(self, tes_client: BaseTESClient = AnalyticsTES(), project: str = None):
15+
self.analysis_orchestrator = None
16+
self.tes_client = tes_client
17+
self.project = project
1918
# Own instances for aggregation and analysis
2019
self.data_processor = DataProcessor()
2120
self.statistical_analyzer = StatisticalAnalyzer()
@@ -43,57 +42,58 @@ def run_analysis(self,
4342
Returns:
4443
Dict[str, Any]: Analysis results
4544
"""
45+
with SubmissionAPISession() as token_session:
46+
self.analysis_orchestrator = AnalysisOrchestrator(self.tes_client, token_session=token_session, project=self.project)
4647

47-
task_name, task_description, bucket, tres = self.analysis_orchestrator.setup_analysis(analysis_type, task_name, task_description, bucket, tres)
48-
49-
# Check if we should run on existing data (returns early if so)
50-
existing_data_result = self.check_analysis_on_existing_data(analysis_type, user_query, tres)
51-
if existing_data_result is not None:
52-
return existing_data_result
53-
54-
### create the TES message for the analysis
55-
56-
self.tes_client.set_tes_messages(
57-
query=user_query,
58-
analysis_type=analysis_type,
59-
task_name=task_name,
60-
task_description=task_description,
61-
output_format="json",
62-
)
63-
self.tes_client.set_tags(tres=self.analysis_orchestrator.tres)
64-
five_Safes_TES_message = self.tes_client.create_FiveSAFES_TES_message()
65-
66-
67-
# Submit task and collect results (common workflow)
68-
try:
69-
task_id, data = self.analysis_orchestrator._submit_and_collect_results(
70-
five_Safes_TES_message,
71-
bucket,
72-
output_format="json",
73-
submit_message=f"Submitting {analysis_type} analysis to {len(self.analysis_orchestrator.tres)} TREs..."
74-
)
75-
76-
# Process and analyze data (aggregation moved to this class)
77-
print("Processing and analyzing data...")
78-
raw_aggregated_data = self.data_processor.aggregate_data(data, analysis_type)
48+
task_name, task_description, bucket, tres = self.analysis_orchestrator.setup_analysis(analysis_type, task_name, task_description, bucket, tres)
7949

80-
analysis_result = self.statistical_analyzer.analyze_data(raw_aggregated_data, analysis_type)
50+
# Check if we should run on existing data (returns early if so)
51+
existing_data_result = self.check_analysis_on_existing_data(analysis_type, user_query, tres)
52+
if existing_data_result is not None:
53+
return existing_data_result
8154

82-
# Store the aggregated values in the centralized dict
83-
self._store_aggregated_values(analysis_type)
55+
### create the TES message for the analysis
8456

85-
return {
86-
'analysis_type': analysis_type,
87-
'result': analysis_result,
88-
'task_id': task_id,
89-
'tres_used': tres,
90-
'data_sources': len(data),
91-
'complete_query': user_query
92-
}
93-
94-
except Exception as e:
95-
print(f"Analysis failed: {str(e)}")
96-
raise
57+
self.tes_client.set_tes_messages(
58+
query=user_query,
59+
analysis_type=analysis_type,
60+
task_name=task_name,
61+
task_description=task_description,
62+
output_format="json",
63+
)
64+
self.tes_client.set_tags(tres=self.analysis_orchestrator.tres)
65+
five_Safes_TES_message = self.tes_client.create_FiveSAFES_TES_message()
66+
67+
# Submit task and collect results (common workflow)
68+
try:
69+
task_id, data = self.analysis_orchestrator._submit_and_collect_results(
70+
five_Safes_TES_message,
71+
bucket,
72+
output_format="json",
73+
submit_message=f"Submitting {analysis_type} analysis to {len(self.analysis_orchestrator.tres)} TREs..."
74+
)
75+
76+
# Process and analyze data (aggregation moved to this class)
77+
print("Processing and analyzing data...")
78+
raw_aggregated_data = self.data_processor.aggregate_data(data, analysis_type)
79+
80+
analysis_result = self.statistical_analyzer.analyze_data(raw_aggregated_data, analysis_type)
81+
82+
# Store the aggregated values in the centralized dict
83+
self._store_aggregated_values(analysis_type)
84+
85+
return {
86+
'analysis_type': analysis_type,
87+
'result': analysis_result,
88+
'task_id': task_id,
89+
'tres_used': tres,
90+
'data_sources': len(data),
91+
'complete_query': user_query
92+
}
93+
94+
except Exception as e:
95+
print(f"Analysis failed: {str(e)}")
96+
raise
9797

9898
def _store_aggregated_values(self, analysis_type: str):
9999
"""

env.example

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22
# Copy this file to .env and update with your actual values
33
# ALL VARIABLES BELOW ARE REQUIRED - the application will fail to start without them
44

5-
# Authentication
6-
5STES_TOKEN=your_jwt_token_here
5+
# Authentication
6+
# Token and Logout URLs are derived from the SubmissionAPIBaseKeyCloakUrl
7+
SubmissionAPIKeyCloakClientId=your-keycloak_client-id
8+
SubmissionAPIKeyCloakSecret=your-keycloak-secret
9+
SubmissionAPIKeyCloakUsername=your-keycloak-user
10+
SubmissionAPIKeyCloakPassword=your-keycloak-password
11+
SubmissionAPIBaseKeyCloakUrl=https://<host>/realms/<realm>
12+
713
5STES_PROJECT=your_project_name
814

915
# TES (Task Execution Service) Configuration

examples/analysis_examples.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
1-
# Example usage functions for common scenarios
1+
"""
2+
Example usage functions for common scenarios.
3+
"""
4+
import os
5+
from string import Template
6+
from typing import List, Dict, Any
7+
8+
from analytics_tes import AnalyticsTES
9+
from analysis_runner import AnalysisRunner
10+
11+
212
def run_mean_analysis_example(analysis_runner: AnalysisRunner, concept_id: int, tres: List[str] = None) -> Dict[str, Any]:
313
"""
414
Example function showing how to run a mean analysis.
@@ -16,7 +26,12 @@ def run_mean_analysis_example(analysis_runner: AnalysisRunner, concept_id: int,
1626
WHERE measurement_concept_id = $concept_id
1727
AND value_as_number IS NOT NULL""")
1828
user_query = query_template.safe_substitute(schema=sql_schema, concept_id=concept_id)
19-
return analysis_runner.run_analysis("mean", user_query=user_query, tres=tres)
29+
return analysis_runner.run_analysis(
30+
analysis_type="mean",
31+
task_name="DEMO: mean analysis test",
32+
user_query=user_query,
33+
tres=tres
34+
)
2035

2136

2237
def run_variance_analysis_example(analysis_runner: AnalysisRunner, concept_id: int, tres: List[str] = None) -> Dict[str, Any]:
@@ -97,4 +112,14 @@ def run_chi_squared_analysis_example(analysis_runner: AnalysisRunner, tres: List
97112
WHERE p.race_concept_id IN (38003574, 38003584)""")
98113

99114
user_query = query_template.safe_substitute(schema=sql_schema)
100-
return analysis_runner.run_analysis("chi_squared_scipy", user_query=user_query, tres=tres)
115+
return analysis_runner.run_analysis("chi_squared_scipy", user_query=user_query, tres=tres)
116+
117+
118+
if __name__ == "__main__":
119+
tes_client = AnalyticsTES()
120+
runner = AnalysisRunner(tes_client=tes_client)
121+
122+
run_mean_analysis_example(
123+
analysis_runner=runner,
124+
concept_id="43055141",
125+
)

metadata_runner.py

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
from bunny_tes import BunnyTES
44
from tes_client import BaseTESClient
55
from data_processor import DataProcessor
6+
from submission_api_session import SubmissionAPISession
67
import os
78

89
class MetadataRunner:
9-
def __init__(self,
10-
tes_client: BaseTESClient = BunnyTES(),
11-
token: str = None,
12-
project: str = None):
13-
self.analysis_orchestrator = AnalysisOrchestrator(tes_client=tes_client, token=token, project=project)
14-
self.tes_client = self.analysis_orchestrator.tes_client
10+
def __init__(self, tes_client: BaseTESClient = BunnyTES(), project: str = None):
11+
self.analysis_orchestrator = None
12+
self.tes_client = tes_client
13+
self.project = project
1514
## don't know whether to use the same processor, or create a new one.
1615
self.data_processor = DataProcessor()
1716

@@ -29,54 +28,56 @@ def get_metadata(self,
2928

3029
analysis_type = "metadata"
3130

32-
task_name, task_description, bucket, tres = self.analysis_orchestrator.setup_analysis(analysis_type, task_name, task_description, bucket, tres)
33-
34-
### create the TES message for the metadata
35-
metadata_tes = self.analysis_orchestrator.tes_client
36-
metadata_tes.set_tes_messages(task_name=task_name, task_description=task_description)
37-
metadata_tes.set_tags(tres=self.analysis_orchestrator.tres)
38-
metadata_tes.create_FiveSAFES_TES_message()
39-
40-
try:
41-
task_id, data = self.analysis_orchestrator._submit_and_collect_results(
42-
metadata_tes.task,
43-
bucket,
44-
output_format="json",
45-
submit_message=f"Submitting {analysis_type} analysis to {len(self.analysis_orchestrator.tres)} TREs..."
46-
)
47-
48-
# Process and analyze data
49-
print("Processing and analyzing data...")
50-
51-
# TODO: Implement proper metadata aggregation
52-
# For now, just pass through the raw data as a placeholder
53-
# The aggregation logic for metadata is different from analytics
54-
# and needs to be implemented separately
55-
raw_aggregated_data = data # Placeholder: no aggregation yet
56-
57-
## placeholder for now, this is where the postprocessing will go.
58-
#analysis_result = self.statistical_analyzer.analyze_data(raw_aggregated_data, analysis_type)
59-
metadata_result = self.postprocess_metadata(raw_aggregated_data)
60-
61-
# Store the aggregated values in the centralized dict
62-
# Note: Metadata storage may differ from analysis results
63-
if isinstance(raw_aggregated_data, dict):
64-
self.aggregated_data.update(raw_aggregated_data)
65-
else:
66-
self.aggregated_data['raw_data'] = raw_aggregated_data
67-
68-
69-
return {
70-
'analysis_type': "metadata",
71-
'result': metadata_result,
72-
'task_id': task_id,
73-
'tres_used': tres,
74-
'data_sources': len(data)
75-
}
76-
77-
except Exception as e:
78-
print(f"Metadata analysis failed: {str(e)}")
79-
raise
31+
with SubmissionAPISession() as token_session:
32+
self.analysis_orchestrator = AnalysisOrchestrator(self.tes_client, token_session, self.project)
33+
task_name, task_description, bucket, tres = self.analysis_orchestrator.setup_analysis(analysis_type, task_name, task_description, bucket, tres)
34+
35+
### create the TES message for the metadata
36+
metadata_tes = self.tes_client
37+
metadata_tes.set_tes_messages(task_name=task_name, task_description=task_description)
38+
metadata_tes.set_tags(tres=self.analysis_orchestrator.tres)
39+
metadata_tes.create_FiveSAFES_TES_message()
40+
41+
try:
42+
task_id, data = self.analysis_orchestrator._submit_and_collect_results(
43+
metadata_tes.task,
44+
bucket,
45+
output_format="json",
46+
submit_message=f"Submitting {analysis_type} analysis to {len(self.analysis_orchestrator.tres)} TREs..."
47+
)
48+
49+
# Process and analyze data
50+
print("Processing and analyzing data...")
51+
52+
# TODO: Implement proper metadata aggregation
53+
# For now, just pass through the raw data as a placeholder
54+
# The aggregation logic for metadata is different from analytics
55+
# and needs to be implemented separately
56+
raw_aggregated_data = data # Placeholder: no aggregation yet
57+
58+
## placeholder for now, this is where the postprocessing will go.
59+
#analysis_result = self.statistical_analyzer.analyze_data(raw_aggregated_data, analysis_type)
60+
metadata_result = self.postprocess_metadata(raw_aggregated_data)
61+
62+
# Store the aggregated values in the centralized dict
63+
# Note: Metadata storage may differ from analysis results
64+
if isinstance(raw_aggregated_data, dict):
65+
self.aggregated_data.update(raw_aggregated_data)
66+
else:
67+
self.aggregated_data['raw_data'] = raw_aggregated_data
68+
69+
70+
return {
71+
'analysis_type': "metadata",
72+
'result': metadata_result,
73+
'task_id': task_id,
74+
'tres_used': tres,
75+
'data_sources': len(data)
76+
}
77+
78+
except Exception as e:
79+
print(f"Metadata analysis failed: {str(e)}")
80+
raise
8081

8182

8283
def postprocess_metadata(self, raw_data: Any) -> Any:

0 commit comments

Comments
 (0)