Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion openhexa/sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""SDK package."""

from .datasets import Dataset
from .pipelines import current_run, parameter, pipeline
from .pipelines import current_pipeline, current_run, parameter, pipeline
from .pipelines.parameter import DHIS2Widget, IASOWidget
from .utils import OpenHexaClient
from .workspaces import workspace
Expand All @@ -17,6 +17,7 @@
__all__ = [
"workspace",
"pipeline",
"current_pipeline",
"parameter",
"current_run",
"DHIS2Connection",
Expand Down
1 change: 1 addition & 0 deletions openhexa/sdk/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"Pipeline",
"parameter",
"current_run",
"current_pipeline",
"import_pipeline",
"download_pipeline",
"get_local_workspace_config",
Expand Down
47 changes: 47 additions & 0 deletions openhexa/sdk/pipelines/current_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Current Pipeline context module."""
import os

from openhexa.sdk.utils import Environment, get_environment


class PipelineConfigError(Exception):
"""Raised whenever the system cannot find an environment variable required to configure the current pipeline."""

pass


class CurrentPipeline:
"""It is used to view manage the current pipeline state and operations."""

@property
def code(self) -> str:
"""The unique slug of the workspace.

Slugs are used to identify the workspace.
"""
try:
return os.environ["HEXA_PIPELINE_CODE"]
except KeyError:
raise PipelineConfigError("The pipeline code is not available in this environment.")

@property
def name(self) -> str:
"""The name of the current pipeline."""
try:
return os.environ["HEXA_PIPELINE_NAME"]
except KeyError:
raise PipelineConfigError("The pipeline name is not available in this environment.")

@property
def type(self) -> str:
"""The type of the current pipeline."""
try:
return os.environ["HEXA_PIPELINE_TYPE"]
except KeyError:
raise PipelineConfigError("The pipeline type is not available in this environment.")


if get_environment() == Environment.CLOUD_JUPYTER:
current_pipeline = None
else:
current_pipeline = CurrentPipeline()
7 changes: 7 additions & 0 deletions openhexa/sdk/pipelines/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ class CurrentRun:
def _connected(self):
return "HEXA_SERVER_URL" in os.environ

def id(self):
"""Exposes pipeline run id."""
if "HEXA_RUN_ID" in os.environ:
return os.environ["HEXA_RUN_ID"]
else:
raise RuntimeError("Current run ID is not set. Make sure you are running this in a pipeline context.")

def add_file_output(self, path: str):
"""Record a run output for a file creation operation.

Expand Down