Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ inputs:
- {name: job_name, type: String, description: 'Unique name to identify the job'}
- {name: template_uri, type: String, description: 'GBML Template uri'}
- {name: resource_config_uri, type: String, description: 'Runtine argument for resource and env specifications of each component'}
- {name: cpu_docker_uri, type: String, description: "Uri to dockerized source code compiled for cpu at runtime"}
- {name: cuda_docker_uri, type: String, description: "Uri to dockerized source code compiled for gpu at runtime"}
outputs:
- {name: frozen_gbml_config_uri, type: String, description: 'Output frozen gbml config uri, populated'}

Expand All @@ -15,5 +17,7 @@ implementation:
--job_name, {inputValue: job_name},
--template_uri, {inputValue: template_uri},
--resource_config_uri, {inputValue: resource_config_uri},
--cpu_docker_uri, {inputValue: cpu_docker_uri},
--cuda_docker_uri, {inputValue: cuda_docker_uri},
--output_file_path_frozen_gbml_config_uri, {outputPath: frozen_gbml_config_uri}
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ inputs:
- {name: start_at, type: String, description: 'Start component'}
- {name: resource_config_uri, type: String, description: 'Runtine argument for resource and env specifications of each component'}
- {name: stop_after, type: String, description: 'Stop component'}
- {name: cpu_docker_uri, type: String, description: "Uri to dockerized source code compiled for cpu at runtime"}
- {name: cuda_docker_uri, type: String, description: "Uri to dockerized source code compiled for gpu at runtime"}
outputs:

implementation:
Expand All @@ -17,5 +19,7 @@ implementation:
--task_config_uri, {inputValue: task_config_uri},
--start_at, {inputValue: start_at},
--resource_config_uri, {inputValue: resource_config_uri},
--stop_after, {inputValue: stop_after}
--stop_after, {inputValue: stop_after},
--cpu_docker_uri, {inputValue: cpu_docker_uri},
--cuda_docker_uri, {inputValue: cuda_docker_uri}
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ inputs:
- {name: task_config_uri, type: String, description: 'Frozen GBML config uri'}
- {name: resource_config_uri, type: String, description: 'Runtine argument for resource and env specifications of each component'}
- {name: custom_worker_image_uri, type: String, description: "Docker image to use for the worker harness in dataflow "}
- {name: cpu_docker_uri, type: String, description: "Uri to dockerized source code compiled for cpu execution at runtime"}
- {name: cuda_docker_uri, type: String, description: "Uri to dockerized source code compiled for gpu execution at runtime"}
outputs:

implementation:
Expand All @@ -16,4 +18,6 @@ implementation:
--task_config_uri, {inputValue: task_config_uri},
--resource_config_uri, {inputValue: resource_config_uri},
--custom_worker_image_uri, {inputValue: custom_worker_image_uri},
--cpu_docker_uri, {inputValue: cpu_docker_uri},
--cuda_docker_uri, {inputValue: cuda_docker_uri},
Comment thread
kmontemayor2-sc marked this conversation as resolved.
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ inputs:
- {name: job_name, type: String, description: 'Unique name to identify the job'}
- {name: task_config_uri, type: String, description: 'Frozen gbml config uri'}
- {name: resource_config_uri, type: String, description: 'Runtine argument for resource and env specifications of each component'}
- {name: cpu_docker_uri, type: String, description: "Uri to dockerized source code compiled for cpu execution at runtime"}
- {name: cuda_docker_uri, type: String, description: "Uri to dockerized source code compiled for gpu execution at runtime"}
outputs:

implementation:
Expand All @@ -14,4 +16,6 @@ implementation:
--job_name, {inputValue: job_name},
--task_config_uri, {inputValue: task_config_uri},
--resource_config_uri, {inputValue: resource_config_uri},
--cpu_docker_uri, {inputValue: cpu_docker_uri},
--cuda_docker_uri, {inputValue: cuda_docker_uri},
Comment thread
kmontemayor2-sc marked this conversation as resolved.
]
15 changes: 15 additions & 0 deletions gigl/orchestration/kubeflow/kfp_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def _generate_component_task(
job_name=job_name,
template_uri=task_config_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=common_pipeline_component_configs.cpu_container_image,
cuda_docker_uri=common_pipeline_component_configs.cuda_container_image,
**common_pipeline_component_configs.additional_job_args.get(component, {}),
)

Expand All @@ -76,6 +78,8 @@ def _generate_component_task(
start_at=start_at,
resource_config_uri=resource_config_uri,
stop_after=stop_after,
cpu_docker_uri=common_pipeline_component_configs.cpu_container_image,
cuda_docker_uri=common_pipeline_component_configs.cuda_container_image,
**common_pipeline_component_configs.additional_job_args.get(component, {}),
)
elif component == GiGLComponents.SubgraphSampler:
Expand All @@ -101,6 +105,8 @@ def _generate_component_task(
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
custom_worker_image_uri=common_pipeline_component_configs.dataflow_container_image,
cpu_docker_uri=common_pipeline_component_configs.cpu_container_image,
cuda_docker_uri=common_pipeline_component_configs.cuda_container_image,
**common_pipeline_component_configs.additional_job_args.get(component, {}),
)
elif component == GiGLComponents.Inferencer:
Expand All @@ -113,6 +119,15 @@ def _generate_component_task(
cuda_docker_uri=common_pipeline_component_configs.cuda_container_image,
**common_pipeline_component_configs.additional_job_args.get(component, {}),
)
elif component == GiGLComponents.PostProcessor:
component_task = _speced_component_op_dict[component](
job_name=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=common_pipeline_component_configs.cpu_container_image,
cuda_docker_uri=common_pipeline_component_configs.cuda_container_image,
**common_pipeline_component_configs.additional_job_args.get(component, {}),
)
else:
component_task = _speced_component_op_dict[component](
job_name=job_name,
Expand Down
52 changes: 46 additions & 6 deletions gigl/orchestration/local/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from gigl.common.utils.proto_utils import ProtoUtils
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.types import AppliedTaskIdentifier
from gigl.src.common.utils.metrics_service_provider import initialize_metrics
from gigl.src.common.utils.gigl_runtime import initialize_gigl_runtime
from gigl.src.config_populator.config_populator import ConfigPopulator
from gigl.src.data_preprocessor.data_preprocessor import DataPreprocessor
from gigl.src.inference.inferencer import Inferencer
Expand Down Expand Up @@ -78,11 +78,6 @@ def run(
f"dataflow_docker_uri: {pipeline_config.dataflow_docker_uri}"
)

initialize_metrics(
task_config_uri=pipeline_config.task_config_uri,
service_name=pipeline_config.applied_task_identifier,
)

if start_at == GiGLComponents.ConfigPopulator.value:
frozen_config_uri = Runner.run_config_populator(pipeline_config)
pipeline_config.task_config_uri = frozen_config_uri
Expand All @@ -107,8 +102,27 @@ def run(
if started:
method(pipeline_config)

@staticmethod
def _initialize_component_runtime(
pipeline_config: PipelineConfig,
component: GiGLComponents,
) -> None:
initialize_gigl_runtime(
applied_task_identifier=pipeline_config.applied_task_identifier,
task_config_uri=pipeline_config.task_config_uri,
resource_config_uri=pipeline_config.resource_config_uri,
service_name=pipeline_config.applied_task_identifier,
component=component,
cpu_docker_uri=pipeline_config.custom_cpu_docker_uri,
cuda_docker_uri=pipeline_config.custom_cuda_docker_uri,
)

@staticmethod
def config_check(start_at: str, pipeline_config: PipelineConfig):
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.ConfigValidator,
)
proto_utils = ProtoUtils()
gbml_config_pb: gbml_config_pb2.GbmlConfig = proto_utils.read_proto_from_yaml(
uri=pipeline_config.task_config_uri, proto_cls=gbml_config_pb2.GbmlConfig
Expand All @@ -123,17 +137,27 @@ def config_check(start_at: str, pipeline_config: PipelineConfig):
@staticmethod
def run_config_populator(pipeline_config: PipelineConfig) -> Uri:
logger.info("Running Config Populator...")
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.ConfigPopulator,
)
config_populator = ConfigPopulator()

return config_populator.run(
applied_task_identifier=pipeline_config.applied_task_identifier,
task_config_uri=pipeline_config.task_config_uri,
resource_config_uri=pipeline_config.resource_config_uri,
cpu_docker_uri=pipeline_config.custom_cpu_docker_uri,
cuda_docker_uri=pipeline_config.custom_cuda_docker_uri,
)

@staticmethod
def run_data_preprocessor(pipeline_config: PipelineConfig) -> None:
logger.info("Running Data Preprocessor...")
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.DataPreprocessor,
)
data_preprocessor = DataPreprocessor()
data_preprocessor.run(
applied_task_identifier=pipeline_config.applied_task_identifier,
Expand All @@ -145,6 +169,10 @@ def run_data_preprocessor(pipeline_config: PipelineConfig) -> None:
@staticmethod
def run_subgraph_sampler(pipeline_config: PipelineConfig) -> None:
logger.info("Running Subgraph Sampler...")
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.SubgraphSampler,
)
subgraph_sampler = SubgraphSampler()
subgraph_sampler.run(
applied_task_identifier=pipeline_config.applied_task_identifier,
Expand All @@ -155,6 +183,10 @@ def run_subgraph_sampler(pipeline_config: PipelineConfig) -> None:
@staticmethod
def run_split_generator(pipeline_config: PipelineConfig) -> None:
logger.info("Running Split Generator...")
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.SplitGenerator,
)
split_generator = SplitGenerator()
split_generator.run(
applied_task_identifier=pipeline_config.applied_task_identifier,
Expand All @@ -165,6 +197,10 @@ def run_split_generator(pipeline_config: PipelineConfig) -> None:
@staticmethod
def run_trainer(pipeline_config: PipelineConfig) -> None:
logger.info("Running Trainer...")
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.Trainer,
)
trainer = Trainer()
trainer.run(
applied_task_identifier=pipeline_config.applied_task_identifier,
Expand All @@ -177,6 +213,10 @@ def run_trainer(pipeline_config: PipelineConfig) -> None:
@staticmethod
def run_inferencer(pipeline_config: PipelineConfig) -> None:
logger.info("Running Inferencer...")
Runner._initialize_component_runtime(
pipeline_config=pipeline_config,
component=GiGLComponents.Inferencer,
)
inferencer = Inferencer()
inferencer.run(
applied_task_identifier=pipeline_config.applied_task_identifier,
Expand Down
30 changes: 28 additions & 2 deletions gigl/src/common/utils/gigl_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from typing import Optional

from gigl.common import Uri
from gigl.env.constants import (
GIGL_CPU_DOCKER_URI_ENV_KEY,
GIGL_CUDA_DOCKER_URI_ENV_KEY,
)
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.utils.gigl_env import get_gigl_runtime_env_vars
from gigl.src.common.utils.metrics_service_provider import initialize_metrics
Expand All @@ -20,6 +24,10 @@ def initialize_gigl_runtime(
) -> None:
"""Initialize GiGL runtime environment and metrics for a component.

For ``SubgraphSampler`` and ``SplitGenerator`` only metrics are initialized;
runtime env vars are not set, since these legacy (Scala/Spark) components do
not consume the GiGL Python runtime.

Args:
applied_task_identifier: Unique identifier for the GiGL job.
task_config_uri: URI to the task config YAML file.
Expand All @@ -29,14 +37,32 @@ def initialize_gigl_runtime(
cpu_docker_uri: CPU source image URI. Defaults to the release CPU image.
cuda_docker_uri: CUDA source image URI. Defaults to the release CUDA image.
"""
if component in {GiGLComponents.SubgraphSampler, GiGLComponents.SplitGenerator}:
initialize_metrics(task_config_uri=task_config_uri, service_name=service_name)
return

# TODO(kmonte): Also expose the dataflow docker URI (used as custom_worker_image_uri by
# DataPreprocessor/Inferencer) as a GIGL_DATAFLOW_DOCKER_URI env var for parity with the
# CPU/CUDA docker URIs. Requires a new key in gigl/env/constants.py and threading it
# through get_gigl_runtime_env_vars.
resolved_cpu_docker_uri = (
os.environ.get(GIGL_CPU_DOCKER_URI_ENV_KEY)
if cpu_docker_uri is None
else cpu_docker_uri
)
resolved_cuda_docker_uri = (
os.environ.get(GIGL_CUDA_DOCKER_URI_ENV_KEY)
if cuda_docker_uri is None
else cuda_docker_uri
)
os.environ.update(
get_gigl_runtime_env_vars(
applied_task_identifier=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
component=component,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
cpu_docker_uri=resolved_cpu_docker_uri,
cuda_docker_uri=resolved_cuda_docker_uri,
)
)
initialize_metrics(task_config_uri=task_config_uri, service_name=service_name)
31 changes: 28 additions & 3 deletions gigl/src/config_populator/config_populator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
from gigl.common.metrics.decorators import flushes_metrics, profileit
from gigl.common.utils.proto_utils import ProtoUtils
from gigl.env.pipelines_config import get_resource_config
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.constants.metrics import TIMER_CONFIG_POPULATOR_S
from gigl.src.common.types import AppliedTaskIdentifier
from gigl.src.common.types.dataset_split import DatasetSplit
from gigl.src.common.types.graph_data import EdgeType, NodeType, Relation
from gigl.src.common.types.pb_wrappers.gbml_config import GbmlConfigPbWrapper
from gigl.src.common.types.pb_wrappers.task_metadata import TaskMetadataPbWrapper
from gigl.src.common.types.task_metadata import TaskMetadataType
from gigl.src.common.utils.gigl_runtime import initialize_gigl_runtime
from gigl.src.common.utils.metrics_service_provider import (
get_metrics_service_instance,
initialize_metrics,
)
from snapchat.research.gbml import (
dataset_metadata_pb2,
Expand Down Expand Up @@ -616,6 +617,8 @@ def run(
applied_task_identifier: AppliedTaskIdentifier,
task_config_uri: Uri,
resource_config_uri: Uri,
cpu_docker_uri: Optional[str] = None,
cuda_docker_uri: Optional[str] = None,
) -> GcsUri:
"""
Runs the ConfigPopulator; given an input GbmlConfig file, produces a frozen one.
Expand All @@ -624,12 +627,20 @@ def run(
applied_task_identifier (AppliedTaskIdentifier): The job name.
task_config_uri (Uri): Template GbmlConfig URI.
resource_config_uri: GiGL resource config Uri
cpu_docker_uri (Optional[str]): CPU source image URI. Defaults to the release CPU image.
cuda_docker_uri (Optional[str]): CUDA source image URI. Defaults to the release CUDA image.

Returns:
GcsUri: The URI of the frozen GbmlConfig.
"""
initialize_metrics(
task_config_uri=task_config_uri, service_name=applied_task_identifier
initialize_gigl_runtime(
applied_task_identifier=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
service_name=applied_task_identifier,
component=GiGLComponents.ConfigPopulator,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
)

resource_config = get_resource_config(resource_config_uri=resource_config_uri)
Expand Down Expand Up @@ -673,6 +684,18 @@ def run(
type=str,
help="Runtime argument for resource and env specifications of each component",
)
parser.add_argument(
"--cpu_docker_uri",
type=str,
default=None,
help="Uri to dockerized source code compiled for cpu at runtime",
)
parser.add_argument(
"--cuda_docker_uri",
type=str,
default=None,
help="Uri to dockerized source code compiled for gpu at runtime",
)

args = parser.parse_args()

Expand All @@ -688,6 +711,8 @@ def run(
applied_task_identifier=ati,
task_config_uri=template_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=args.cpu_docker_uri,
cuda_docker_uri=args.cuda_docker_uri,
)

# Write fozen_gbml_config_uri to file where it can be read by subsequent components
Expand Down
Loading