Skip to content

Commit 20a7b24

Browse files
author
nazarfil
committed
exposes current pipeline code, type
1 parent 176a805 commit 20a7b24

3 files changed

Lines changed: 50 additions & 1 deletion

File tree

openhexa/sdk/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""SDK package."""
22

33
from .datasets import Dataset
4-
from .pipelines import current_run, parameter, pipeline
4+
from .pipelines import current_pipeline, current_run, parameter, pipeline
55
from .pipelines.parameter import DHIS2Widget, IASOWidget
66
from .utils import OpenHexaClient
77
from .workspaces import workspace
@@ -17,6 +17,7 @@
1717
__all__ = [
1818
"workspace",
1919
"pipeline",
20+
"current_pipeline",
2021
"parameter",
2122
"current_run",
2223
"DHIS2Connection",

openhexa/sdk/pipelines/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"Pipeline",
1616
"parameter",
1717
"current_run",
18+
"current_pipeline",
1819
"import_pipeline",
1920
"download_pipeline",
2021
"get_local_workspace_config",
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Current Pipeline context module."""
2+
import os
3+
4+
from openhexa.sdk.utils import Environment, get_environment
5+
6+
7+
class PipelineConfigError(Exception):
8+
"""Raised whenever the system cannot find an environment variable required to configure the current pipeline."""
9+
10+
pass
11+
12+
13+
class CurrentPipeline:
14+
"""It is used to view manage the current pipeline state and operations."""
15+
16+
@property
17+
def code(self) -> str:
18+
"""The unique slug of the workspace.
19+
20+
Slugs are used to identify the workspace.
21+
"""
22+
try:
23+
return os.environ["HEXA_PIPELINE_CODE"]
24+
except KeyError:
25+
raise PipelineConfigError("The pipeline code is not available in this environment.")
26+
27+
@property
28+
def name(self) -> str:
29+
"""The name of the current pipeline."""
30+
try:
31+
return os.environ["HEXA_PIPELINE_NAME"]
32+
except KeyError:
33+
raise PipelineConfigError("The pipeline name is not available in this environment.")
34+
35+
@property
36+
def type(self) -> str:
37+
"""The type of the current pipeline."""
38+
try:
39+
return os.environ["HEXA_PIPELINE_TYPE"]
40+
except KeyError:
41+
raise PipelineConfigError("The pipeline type is not available in this environment.")
42+
43+
44+
if get_environment() == Environment.CLOUD_JUPYTER:
45+
current_pipeline = None
46+
else:
47+
current_pipeline = CurrentPipeline()

0 commit comments

Comments
 (0)