diff --git a/sagemaker-core/src/sagemaker/core/resources.py b/sagemaker-core/src/sagemaker/core/resources.py index 66b13e112a..143fb683e7 100644 --- a/sagemaker-core/src/sagemaker/core/resources.py +++ b/sagemaker-core/src/sagemaker/core/resources.py @@ -13036,6 +13036,335 @@ def get_name(self) -> str: logger.error("Name attribute not found for object feature_group") return None + @staticmethod + def _s3_uri_to_arn(s3_uri: str) -> str: + """ + Convert S3 URI to S3 ARN format. + + Args: + s3_uri: S3 URI in format s3://bucket/path or already an ARN + + Returns: + S3 ARN in format arn:aws:s3:::bucket/path + """ + if s3_uri.startswith("arn:aws:s3:::"): + return s3_uri + path = s3_uri.replace("s3://", "") + return f"arn:aws:s3:::{path}" + + def _get_lake_formation_client( + self, + session: Optional[Session] = None, + region: Optional[str] = None, + ): + """ + Get a Lake Formation client. + + Args: + session: Boto3 session. If not provided, a new session will be created. + region: AWS region name. + + Returns: + A boto3 Lake Formation client. + """ + boto_session = session or Session() + return boto_session.client("lakeformation", region_name=region) + + def _register_s3_with_lake_formation( + self, + s3_location: str, + session: Optional[Session] = None, + region: Optional[str] = None, + use_service_linked_role: bool = True, + role_arn: Optional[str] = None, + ) -> bool: + """ + Register an S3 location with Lake Formation. + + Args: + s3_location: S3 URI or ARN to register. + session: Boto3 session. + region: AWS region. + use_service_linked_role: Whether to use the Lake Formation service-linked role. + If True, Lake Formation uses its service-linked role for registration. + If False, role_arn must be provided. + role_arn: IAM role ARN to use for registration. Required when + use_service_linked_role is False. + + Returns: + True if registration succeeded or location already registered. + + Raises: + ValueError: If use_service_linked_role is False but role_arn is not provided. + ClientError: If registration fails for unexpected reasons. + """ + if not use_service_linked_role and not role_arn: + raise ValueError( + "role_arn must be provided when use_service_linked_role is False" + ) + + client = self._get_lake_formation_client(session, region) + resource_arn = self._s3_uri_to_arn(s3_location) + + try: + register_params = {"ResourceArn": resource_arn} + + if use_service_linked_role: + register_params["UseServiceLinkedRole"] = True + else: + register_params["RoleArn"] = role_arn + + client.register_resource(**register_params) + logger.info(f"Successfully registered S3 location: {resource_arn}") + return True + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "AlreadyExistsException": + logger.info(f"S3 location already registered: {resource_arn}") + return True + raise + + def _revoke_iam_allowed_principal( + self, + database_name: str, + table_name: str, + session: Optional[Session] = None, + region: Optional[str] = None, + ) -> bool: + """ + Revoke IAMAllowedPrincipal permissions from a Glue table. + + Args: + database_name: Glue database name. + table_name: Glue table name. + session: Boto3 session. + region: AWS region. + + Returns: + True if revocation succeeded or permissions didn't exist. + + Raises: + ClientError: If revocation fails for unexpected reasons. + """ + client = self._get_lake_formation_client(session, region) + + try: + client.revoke_permissions( + Principal={"DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS"}, + Resource={ + "Table": { + "DatabaseName": database_name, + "Name": table_name, + } + }, + Permissions=["ALL"], + ) + logger.info( + f"Revoked IAMAllowedPrincipal from table: {database_name}.{table_name}" + ) + return True + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "InvalidInputException": + logger.info( + f"IAMAllowedPrincipal permissions may not exist on: {database_name}.{table_name}" + ) + return True + raise + + def _grant_lake_formation_permissions( + self, + role_arn: str, + database_name: str, + table_name: str, + session: Optional[Session] = None, + region: Optional[str] = None, + ) -> bool: + """ + Grant permissions to a role on a Glue table via Lake Formation. + + Args: + role_arn: IAM role ARN to grant permissions to. + database_name: Glue database name. + table_name: Glue table name. + session: Boto3 session. + region: AWS region. + + Returns: + True if grant succeeded or permissions already exist. + + Raises: + ClientError: If grant fails for unexpected reasons. + """ + client = self._get_lake_formation_client(session, region) + permissions = ["SELECT", "INSERT", "DELETE", "DESCRIBE", "ALTER"] + + try: + client.grant_permissions( + Principal={"DataLakePrincipalIdentifier": role_arn}, + Resource={ + "Table": { + "DatabaseName": database_name, + "Name": table_name, + } + }, + Permissions=permissions, + PermissionsWithGrantOption=[], + ) + logger.info( + f"Granted permissions to {role_arn} on table: {database_name}.{table_name}" + ) + return True + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "InvalidInputException": + logger.info( + f"Permissions may already exist for {role_arn} on: {database_name}.{table_name}" + ) + return True + raise + + @Base.add_validate_call + def enable_lake_formation( + self, + session: Optional[Session] = None, + region: Optional[StrPipeVar] = None, + use_service_linked_role: bool = True, + registration_role_arn: Optional[str] = None, + ) -> dict: + """ + Enable Lake Formation governance for this Feature Group's offline store. + + This method: + 1. Registers the offline store S3 location with Lake Formation + 2. Grants the execution role permissions on the Glue table + 3. Revokes IAMAllowedPrincipal permissions from the Glue table + + The role ARN is automatically extracted from the Feature Group's configuration. + Each phase depends on the success of the previous phase - if any phase fails, + subsequent phases are not executed. + + Parameters: + session: Boto3 session. + region: Region name. + use_service_linked_role: Whether to use the Lake Formation service-linked role + for S3 registration. If True, Lake Formation uses its service-linked role. + If False, registration_role_arn must be provided. Default is True. + registration_role_arn: IAM role ARN to use for S3 registration with Lake Formation. + Required when use_service_linked_role is False. This is separate from the + Feature Group's execution role (role_arn), which is used for Glue table permissions. + + Returns: + Dict with status of each Lake Formation operation: + - s3_registration: bool + - iam_principal_revoked: bool + - permissions_granted: bool + + Raises: + ValueError: If the Feature Group has no offline store configured, + if role_arn is not set on the Feature Group, or if use_service_linked_role + is False but registration_role_arn is not provided. + ClientError: If Lake Formation operations fail. + RuntimeError: If a phase fails and subsequent phases cannot proceed. + """ + # Refresh to get latest state + self.refresh() + + # Validate offline store exists + if self.offline_store_config is None or self.offline_store_config == Unassigned(): + raise ValueError( + f"Feature Group '{self.feature_group_name}' does not have an offline store configured. " + "Lake Formation can only be enabled for Feature Groups with offline stores." + ) + + # Get role ARN from Feature Group config + if self.role_arn is None or self.role_arn == Unassigned(): + raise ValueError( + f"Feature Group '{self.feature_group_name}' does not have a role_arn configured. " + "Lake Formation requires a role ARN to grant permissions." + ) + + # Extract required configuration + s3_config = self.offline_store_config.s3_storage_config + if s3_config is None: + raise ValueError("Offline store S3 configuration is missing") + + resolved_s3_uri = s3_config.resolved_output_s3_uri + if resolved_s3_uri is None or resolved_s3_uri == Unassigned(): + raise ValueError( + "Resolved S3 URI not available. Ensure the Feature Group is in 'Created' status." + ) + + data_catalog_config = self.offline_store_config.data_catalog_config + if data_catalog_config is None: + raise ValueError("Data catalog configuration is missing from offline store config") + + database_name = data_catalog_config.database + table_name = data_catalog_config.table_name + + if not database_name or not table_name: + raise ValueError("Database name and table name are required from data catalog config") + + # Execute Lake Formation setup with fail-fast behavior + results = { + "s3_registration": False, + "iam_principal_revoked": False, + "permissions_granted": False, + } + + # Phase 1: Register S3 with Lake Formation + try: + results["s3_registration"] = self._register_s3_with_lake_formation( + resolved_s3_uri, + session, + region, + use_service_linked_role=use_service_linked_role, + role_arn=registration_role_arn, + ) + except Exception as e: + raise RuntimeError( + f"Failed to register S3 location with Lake Formation. " + f"Subsequent phases skipped. Results: {results}. Error: {e}" + ) from e + + if not results["s3_registration"]: + raise RuntimeError( + f"Failed to register S3 location with Lake Formation. " + f"Subsequent phases skipped. Results: {results}" + ) + + # Phase 2: Grant Lake Formation permissions to the role + try: + results["permissions_granted"] = self._grant_lake_formation_permissions( + self.role_arn, database_name, table_name, session, region + ) + except Exception as e: + raise RuntimeError( + f"Failed to grant Lake Formation permissions. " + f"Subsequent phases skipped. Results: {results}. Error: {e}" + ) from e + + if not results["permissions_granted"]: + raise RuntimeError( + f"Failed to grant Lake Formation permissions. " + f"Subsequent phases skipped. Results: {results}" + ) + + # Phase 3: Revoke IAMAllowedPrincipal permissions + try: + results["iam_principal_revoked"] = self._revoke_iam_allowed_principal( + database_name, table_name, session, region + ) + except Exception as e: + raise RuntimeError( + f"Failed to revoke IAMAllowedPrincipal permissions. Results: {results}. Error: {e}" + ) from e + + if not results["iam_principal_revoked"]: + raise RuntimeError( + f"Failed to revoke IAMAllowedPrincipal permissions. Results: {results}" + ) + + logger.info(f"Lake Formation setup complete for {self.feature_group_name}: {results}") + return results + def populate_inputs_decorator(create_func): @functools.wraps(create_func) def wrapper(*args, **kwargs): @@ -13075,6 +13404,7 @@ def create( description: Optional[StrPipeVar] = Unassigned(), tags: Optional[List[Tag]] = Unassigned(), use_pre_prod_offline_store_replicator_lambda: Optional[bool] = Unassigned(), + enable_lake_formation: bool = False, session: Optional[Session] = None, region: Optional[StrPipeVar] = None, ) -> Optional["FeatureGroup"]: @@ -13093,6 +13423,7 @@ def create( description: A free-form description of a FeatureGroup. tags: Tags used to identify Features in each FeatureGroup. use_pre_prod_offline_store_replicator_lambda: + enable_lake_formation: If True, enables Lake Formation governance for the offline store. Requires offline_store_config and role_arn to be set. The role_arn will be extracted from the Feature Group configuration. Defaults to False. session: Boto3 session. region: Region name. @@ -13116,6 +13447,17 @@ def create( S3ConfigNotFoundError: Raised when a configuration file is not found in S3 """ + # Validation for Lake Formation + if enable_lake_formation: + if offline_store_config == Unassigned() or offline_store_config is None: + raise ValueError( + "enable_lake_formation=True requires offline_store_config to be configured" + ) + if role_arn == Unassigned() or role_arn is None: + raise ValueError( + "enable_lake_formation=True requires role_arn to be specified" + ) + logger.info("Creating feature_group resource.") client = Base.get_sagemaker_client( session=session, region_name=region, service_name="sagemaker" @@ -13148,7 +13490,14 @@ def create( response = client.create_feature_group(**operation_input_args) logger.debug(f"Response: {response}") - return cls.get(feature_group_name=feature_group_name, session=session, region=region) + feature_group = cls.get(feature_group_name=feature_group_name, session=session, region=region) + + # Enable Lake Formation if requested + if enable_lake_formation: + feature_group.wait_for_status(target_status="Created") + feature_group.enable_lake_formation(session=session, region=region) + + return feature_group @classmethod @Base.add_validate_call