Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions gigl/src/validation_check/config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
assert_subgraph_sampler_output_exists,
assert_trained_model_exists,
)
from gigl.src.validation_check.libs.gbml_and_resource_config_compatibility_checks import (
check_inferencer_graph_store_compatibility,
check_trainer_graph_store_compatibility,
)
from gigl.src.validation_check.libs.name_checks import (
check_if_kfp_pipeline_job_name_valid,
)
Expand Down Expand Up @@ -191,6 +195,79 @@

logger = Logger()

# Map of start components to graph store compatibility checks to run
# Only run trainer checks when starting at or before Trainer
# Only run inferencer checks when starting at or before Inferencer
START_COMPONENT_TO_GRAPH_STORE_COMPATIBILITY_CHECKS = {
GiGLComponents.ConfigPopulator.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
],
GiGLComponents.DataPreprocessor.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
],
GiGLComponents.SubgraphSampler.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
],
GiGLComponents.SplitGenerator.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
],
GiGLComponents.Trainer.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
],
GiGLComponents.Inferencer.value: [
check_inferencer_graph_store_compatibility,
],
# PostProcessor doesn't need graph store compatibility checks
}

# Map of (start, stop) component tuples to graph store compatibility checks

STOP_COMPONENT_TO_GRAPH_STORE_COMPATIBILITY_CHECKS_TO_SKIP = {
GiGLComponents.Trainer.value: [
check_inferencer_graph_store_compatibility,
],
}


def _run_gbml_and_resource_config_compatibility_checks(
start_at: str,
stop_after: Optional[str],
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
resource_config_wrapper: GiglResourceConfigWrapper,
) -> None:
"""
Run compatibility checks between GbmlConfig and GiglResourceConfig.

These checks verify that graph store mode configurations are consistent
across both the template config (GbmlConfig) and resource config (GiglResourceConfig).

Args:
start_at: The component to start at.
stop_after: Optional component to stop after.
gbml_config_pb_wrapper: The GbmlConfig wrapper (template config).
resource_config_wrapper: The GiglResourceConfig wrapper (resource config).
"""
# Get the appropriate compatibility checks based on start/stop components
compatibility_checks = set(
START_COMPONENT_TO_GRAPH_STORE_COMPATIBILITY_CHECKS.get(start_at, [])
)
if stop_after in STOP_COMPONENT_TO_GRAPH_STORE_COMPATIBILITY_CHECKS_TO_SKIP:
for skipped_check in STOP_COMPONENT_TO_GRAPH_STORE_COMPATIBILITY_CHECKS_TO_SKIP[
stop_after
]:
compatibility_checks.discard(skipped_check)

for check in compatibility_checks:
check(
gbml_config_pb_wrapper=gbml_config_pb_wrapper,
resource_config_wrapper=resource_config_wrapper,
)


def kfp_validation_checks(
job_name: str,
Expand Down Expand Up @@ -261,6 +338,15 @@ def kfp_validation_checks(
f"Skipping resource config check {resource_config_check.__name__} because we are using live subgraph sampling backend."
)

# check compatibility between template config and resource config for graph store mode
# These checks ensure that if graph store mode is enabled in one config, it's also enabled in the other
_run_gbml_and_resource_config_compatibility_checks(
start_at=start_at,
stop_after=stop_after,
gbml_config_pb_wrapper=gbml_config_pb_wrapper,
resource_config_wrapper=resource_config_wrapper,
)

# check if trained model file exist when skipping training
if gbml_config_pb.shared_config.should_skip_training == True:
assert_trained_model_exists(gbml_config_pb=gbml_config_pb)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""
Compatibility checks between GbmlConfig (template config) and GiglResourceConfig (resource config).

These checks ensure that graph store mode configurations are consistent across both configs.
If graph store mode is set up for trainer or inferencer in one config, it must be set up in the other.
"""

from typing import Literal

from google.protobuf.message import Message

from gigl.common.logger import Logger
from gigl.src.common.types.pb_wrappers.gbml_config import GbmlConfigPbWrapper
from gigl.src.common.types.pb_wrappers.gigl_resource_config import (
GiglResourceConfigWrapper,
)
from snapchat.research.gbml import gigl_resource_config_pb2

logger = Logger()


def _gbml_config_has_graph_store(
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
component: Literal["trainer", "inferencer"],
) -> bool:
"""
Check if the GbmlConfig has graph_store_storage_config set for inferencer.

Args:
gbml_config_pb_wrapper: The GbmlConfig wrapper to check.

Returns:
True if graph_store_storage_config is set for inferencer, False otherwise.
"""
if component == "inferencer":
config: Message = gbml_config_pb_wrapper.gbml_config_pb.inferencer_config
elif component == "trainer":
config = gbml_config_pb_wrapper.gbml_config_pb.trainer_config
else:
raise ValueError(
f"Invalid component: {component}. Must be 'inferencer' or 'trainer'."
)
return config.HasField("graph_store_storage_config")


def _resource_config_has_graph_store(
resource_config_wrapper: GiglResourceConfigWrapper,
component: Literal["trainer", "inferencer"],
) -> bool:
"""
Check if the GiglResourceConfig has VertexAiGraphStoreConfig set for the given component.

Args:
resource_config_wrapper: The resource config wrapper to check.

Returns:
True if VertexAiGraphStoreConfig is set for trainer, False otherwise.
"""
if component == "trainer":
config: Message = resource_config_wrapper.trainer_config
elif component == "inferencer":
config = resource_config_wrapper.inferencer_config
else:
raise ValueError(
f"Invalid component: {component}. Must be 'trainer' or 'inferencer'."
)
return isinstance(config, gigl_resource_config_pb2.VertexAiGraphStoreConfig)


def check_trainer_graph_store_compatibility(
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
resource_config_wrapper: GiglResourceConfigWrapper,
) -> None:
"""
Check that trainer graph store mode is consistently configured across both configs.

If graph_store_storage_config is set in GbmlConfig.trainer_config, then
VertexAiGraphStoreConfig must be set in GiglResourceConfig.trainer_resource_config,
and vice versa. Also validates that storage_command is set when graph store mode is enabled.

Args:
gbml_config_pb_wrapper: The GbmlConfig wrapper (template config).
resource_config_wrapper: The GiglResourceConfig wrapper (resource config).

Raises:
AssertionError: If graph store configurations are not compatible or storage_command is missing.
"""
logger.info(
"Config validation check: trainer graph store compatibility between template and resource configs."
)

gbml_has_graph_store = _gbml_config_has_graph_store(
gbml_config_pb_wrapper, "trainer"
)
resource_has_graph_store = _resource_config_has_graph_store(
resource_config_wrapper, "trainer"
)

if gbml_has_graph_store != resource_has_graph_store:
raise AssertionError(
f"If one of GbmlConfig.trainer_config.graph_store_storage_config or GiglResourceConfig.trainer_resource_config is set, the other must also be set. GbmlConfig.trainer_config.graph_store_storage_config is set: {gbml_has_graph_store}, GiglResourceConfig.trainer_resource_config is set: {resource_has_graph_store}."
)


def check_inferencer_graph_store_compatibility(
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
resource_config_wrapper: GiglResourceConfigWrapper,
) -> None:
"""
Check that inferencer graph store mode is consistently configured across both configs.

If graph_store_storage_config is set in GbmlConfig.inferencer_config, then
VertexAiGraphStoreConfig must be set in GiglResourceConfig.inferencer_resource_config,
and vice versa. Also validates that storage_command is set when graph store mode is enabled.

Args:
gbml_config_pb_wrapper: The GbmlConfig wrapper (template config).
resource_config_wrapper: The GiglResourceConfig wrapper (resource config).

Raises:
AssertionError: If graph store configurations are not compatible or storage_command is missing.
"""
logger.info(
"Config validation check: inferencer graph store compatibility between template and resource configs."
)

gbml_has_graph_store = _gbml_config_has_graph_store(
gbml_config_pb_wrapper, "inferencer"
)
resource_has_graph_store = _resource_config_has_graph_store(
resource_config_wrapper, "inferencer"
)

if gbml_has_graph_store != resource_has_graph_store:
raise AssertionError(
f"If one of GbmlConfig.inferencer_config.graph_store_storage_config or GiglResourceConfig.inferencer_resource_config is set, the other must also be set. GbmlConfig.inferencer_config.graph_store_storage_config is set: {gbml_has_graph_store}, GiglResourceConfig.inferencer_resource_config is set: {resource_has_graph_store}."
)
51 changes: 51 additions & 0 deletions gigl/src/validation_check/libs/resource_config_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from google.cloud.aiplatform_v1.types.accelerator_type import AcceleratorType

from gigl.common.logger import Logger
from gigl.src.common.types.pb_wrappers.gbml_config import GbmlConfigPbWrapper
from gigl.src.common.types.pb_wrappers.gigl_resource_config import (
GiglResourceConfigWrapper,
)
Expand Down Expand Up @@ -254,3 +255,53 @@ def _validate_machine_config(
or {gigl_resource_config_pb2.VertexAiGraphStoreConfig.__name__}.
Got {type(config)}"""
)


def check_if_trainer_graph_store_storage_command_valid(
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
) -> None:
"""
Validates that storage_command is set when graph store mode is enabled for trainer.

Args:
gbml_config_pb_wrapper: The GbmlConfig wrapper to check.

Raises:
AssertionError: If graph store mode is enabled but storage_command is missing.
"""
logger.info(
"Config validation check: if trainer graph store storage_command is valid."
)
trainer_config = gbml_config_pb_wrapper.gbml_config_pb.trainer_config
if trainer_config.HasField("graph_store_storage_config"):
storage_command = trainer_config.graph_store_storage_config.command
if not storage_command:
raise AssertionError(
"GbmlConfig.trainer_config.graph_store_storage_config.storage_command must be set "
"when using graph store mode for trainer."
)


def check_if_inferencer_graph_store_storage_command_valid(
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
) -> None:
"""
Validates that storage_command is set when graph store mode is enabled for inferencer.

Args:
gbml_config_pb_wrapper: The GbmlConfig wrapper to check.

Raises:
AssertionError: If graph store mode is enabled but storage_command is missing.
"""
logger.info(
"Config validation check: if inferencer graph store storage_command is valid."
)
inferencer_config = gbml_config_pb_wrapper.gbml_config_pb.inferencer_config
if inferencer_config.HasField("graph_store_storage_config"):
storage_command = inferencer_config.graph_store_storage_config.command
if not storage_command:
raise AssertionError(
"GbmlConfig.inferencer_config.graph_store_storage_config.storage_command must be set "
"when using graph store mode for inferencer."
)
Loading