Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 350 additions & 1 deletion sagemaker-core/src/sagemaker/core/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13036,6 +13036,335 @@ def get_name(self) -> str:
logger.error("Name attribute not found for object feature_group")
return None

@staticmethod
def _s3_uri_to_arn(s3_uri: str) -> str:
"""
Convert S3 URI to S3 ARN format.

Args:
s3_uri: S3 URI in format s3://bucket/path or already an ARN

Returns:
S3 ARN in format arn:aws:s3:::bucket/path
"""
if s3_uri.startswith("arn:aws:s3:::"):
return s3_uri
path = s3_uri.replace("s3://", "")
return f"arn:aws:s3:::{path}"

def _get_lake_formation_client(
self,
session: Optional[Session] = None,
region: Optional[str] = None,
):
"""
Get a Lake Formation client.

Args:
session: Boto3 session. If not provided, a new session will be created.
region: AWS region name.

Returns:
A boto3 Lake Formation client.
"""
boto_session = session or Session()
return boto_session.client("lakeformation", region_name=region)

def _register_s3_with_lake_formation(
self,
s3_location: str,
session: Optional[Session] = None,
region: Optional[str] = None,
use_service_linked_role: bool = True,
role_arn: Optional[str] = None,
) -> bool:
"""
Register an S3 location with Lake Formation.

Args:
s3_location: S3 URI or ARN to register.
session: Boto3 session.
region: AWS region.
use_service_linked_role: Whether to use the Lake Formation service-linked role.
If True, Lake Formation uses its service-linked role for registration.
If False, role_arn must be provided.
role_arn: IAM role ARN to use for registration. Required when
use_service_linked_role is False.

Returns:
True if registration succeeded or location already registered.

Raises:
ValueError: If use_service_linked_role is False but role_arn is not provided.
ClientError: If registration fails for unexpected reasons.
"""
if not use_service_linked_role and not role_arn:
raise ValueError(
"role_arn must be provided when use_service_linked_role is False"
)

client = self._get_lake_formation_client(session, region)
resource_arn = self._s3_uri_to_arn(s3_location)

try:
register_params = {"ResourceArn": resource_arn}

if use_service_linked_role:
register_params["UseServiceLinkedRole"] = True
else:
register_params["RoleArn"] = role_arn

client.register_resource(**register_params)
logger.info(f"Successfully registered S3 location: {resource_arn}")
return True
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "AlreadyExistsException":
logger.info(f"S3 location already registered: {resource_arn}")
return True
raise

def _revoke_iam_allowed_principal(
self,
database_name: str,
table_name: str,
session: Optional[Session] = None,
region: Optional[str] = None,
) -> bool:
"""
Revoke IAMAllowedPrincipal permissions from a Glue table.

Args:
database_name: Glue database name.
table_name: Glue table name.
session: Boto3 session.
region: AWS region.

Returns:
True if revocation succeeded or permissions didn't exist.

Raises:
ClientError: If revocation fails for unexpected reasons.
"""
client = self._get_lake_formation_client(session, region)

try:
client.revoke_permissions(
Principal={"DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS"},
Resource={
"Table": {
"DatabaseName": database_name,
"Name": table_name,
}
},
Permissions=["ALL"],
)
logger.info(
f"Revoked IAMAllowedPrincipal from table: {database_name}.{table_name}"
)
return True
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "InvalidInputException":
logger.info(
f"IAMAllowedPrincipal permissions may not exist on: {database_name}.{table_name}"
)
return True
raise

def _grant_lake_formation_permissions(
self,
role_arn: str,
database_name: str,
table_name: str,
session: Optional[Session] = None,
region: Optional[str] = None,
) -> bool:
"""
Grant permissions to a role on a Glue table via Lake Formation.

Args:
role_arn: IAM role ARN to grant permissions to.
database_name: Glue database name.
table_name: Glue table name.
session: Boto3 session.
region: AWS region.

Returns:
True if grant succeeded or permissions already exist.

Raises:
ClientError: If grant fails for unexpected reasons.
"""
client = self._get_lake_formation_client(session, region)
permissions = ["SELECT", "INSERT", "DELETE", "DESCRIBE", "ALTER"]

try:
client.grant_permissions(
Principal={"DataLakePrincipalIdentifier": role_arn},
Resource={
"Table": {
"DatabaseName": database_name,
"Name": table_name,
}
},
Permissions=permissions,
PermissionsWithGrantOption=[],
)
logger.info(
f"Granted permissions to {role_arn} on table: {database_name}.{table_name}"
)
return True
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "InvalidInputException":
logger.info(
f"Permissions may already exist for {role_arn} on: {database_name}.{table_name}"
)
return True
raise

@Base.add_validate_call
def enable_lake_formation(
self,
session: Optional[Session] = None,
region: Optional[StrPipeVar] = None,
use_service_linked_role: bool = True,
registration_role_arn: Optional[str] = None,
) -> dict:
"""
Enable Lake Formation governance for this Feature Group's offline store.

This method:
1. Registers the offline store S3 location with Lake Formation
2. Grants the execution role permissions on the Glue table
3. Revokes IAMAllowedPrincipal permissions from the Glue table

The role ARN is automatically extracted from the Feature Group's configuration.
Each phase depends on the success of the previous phase - if any phase fails,
subsequent phases are not executed.

Parameters:
session: Boto3 session.
region: Region name.
use_service_linked_role: Whether to use the Lake Formation service-linked role
for S3 registration. If True, Lake Formation uses its service-linked role.
If False, registration_role_arn must be provided. Default is True.
registration_role_arn: IAM role ARN to use for S3 registration with Lake Formation.
Required when use_service_linked_role is False. This is separate from the
Feature Group's execution role (role_arn), which is used for Glue table permissions.

Returns:
Dict with status of each Lake Formation operation:
- s3_registration: bool
- iam_principal_revoked: bool
- permissions_granted: bool

Raises:
ValueError: If the Feature Group has no offline store configured,
if role_arn is not set on the Feature Group, or if use_service_linked_role
is False but registration_role_arn is not provided.
ClientError: If Lake Formation operations fail.
RuntimeError: If a phase fails and subsequent phases cannot proceed.
"""
# Refresh to get latest state
self.refresh()

# Validate offline store exists
if self.offline_store_config is None or self.offline_store_config == Unassigned():
raise ValueError(
f"Feature Group '{self.feature_group_name}' does not have an offline store configured. "
"Lake Formation can only be enabled for Feature Groups with offline stores."
)

# Get role ARN from Feature Group config
if self.role_arn is None or self.role_arn == Unassigned():
raise ValueError(
f"Feature Group '{self.feature_group_name}' does not have a role_arn configured. "
"Lake Formation requires a role ARN to grant permissions."
)

# Extract required configuration
s3_config = self.offline_store_config.s3_storage_config
if s3_config is None:
raise ValueError("Offline store S3 configuration is missing")

resolved_s3_uri = s3_config.resolved_output_s3_uri
if resolved_s3_uri is None or resolved_s3_uri == Unassigned():
raise ValueError(
"Resolved S3 URI not available. Ensure the Feature Group is in 'Created' status."
)

data_catalog_config = self.offline_store_config.data_catalog_config
if data_catalog_config is None:
raise ValueError("Data catalog configuration is missing from offline store config")

database_name = data_catalog_config.database
table_name = data_catalog_config.table_name

if not database_name or not table_name:
raise ValueError("Database name and table name are required from data catalog config")

# Execute Lake Formation setup with fail-fast behavior
results = {
"s3_registration": False,
"iam_principal_revoked": False,
"permissions_granted": False,
}

# Phase 1: Register S3 with Lake Formation
try:
results["s3_registration"] = self._register_s3_with_lake_formation(
resolved_s3_uri,
session,
region,
use_service_linked_role=use_service_linked_role,
role_arn=registration_role_arn,
)
except Exception as e:
raise RuntimeError(
f"Failed to register S3 location with Lake Formation. "
f"Subsequent phases skipped. Results: {results}. Error: {e}"
) from e

if not results["s3_registration"]:
raise RuntimeError(
f"Failed to register S3 location with Lake Formation. "
f"Subsequent phases skipped. Results: {results}"
)

# Phase 2: Grant Lake Formation permissions to the role
try:
results["permissions_granted"] = self._grant_lake_formation_permissions(
self.role_arn, database_name, table_name, session, region
)
except Exception as e:
raise RuntimeError(
f"Failed to grant Lake Formation permissions. "
f"Subsequent phases skipped. Results: {results}. Error: {e}"
) from e

if not results["permissions_granted"]:
raise RuntimeError(
f"Failed to grant Lake Formation permissions. "
f"Subsequent phases skipped. Results: {results}"
)

# Phase 3: Revoke IAMAllowedPrincipal permissions
try:
results["iam_principal_revoked"] = self._revoke_iam_allowed_principal(
database_name, table_name, session, region
)
except Exception as e:
raise RuntimeError(
f"Failed to revoke IAMAllowedPrincipal permissions. Results: {results}. Error: {e}"
) from e

if not results["iam_principal_revoked"]:
raise RuntimeError(
f"Failed to revoke IAMAllowedPrincipal permissions. Results: {results}"
)

logger.info(f"Lake Formation setup complete for {self.feature_group_name}: {results}")
return results

def populate_inputs_decorator(create_func):
@functools.wraps(create_func)
def wrapper(*args, **kwargs):
Expand Down Expand Up @@ -13075,6 +13404,7 @@ def create(
description: Optional[StrPipeVar] = Unassigned(),
tags: Optional[List[Tag]] = Unassigned(),
use_pre_prod_offline_store_replicator_lambda: Optional[bool] = Unassigned(),
enable_lake_formation: bool = False,
session: Optional[Session] = None,
region: Optional[StrPipeVar] = None,
) -> Optional["FeatureGroup"]:
Expand All @@ -13093,6 +13423,7 @@ def create(
description: A free-form description of a FeatureGroup.
tags: Tags used to identify Features in each FeatureGroup.
use_pre_prod_offline_store_replicator_lambda:
enable_lake_formation: If True, enables Lake Formation governance for the offline store. Requires offline_store_config and role_arn to be set. The role_arn will be extracted from the Feature Group configuration. Defaults to False.
session: Boto3 session.
region: Region name.

Expand All @@ -13116,6 +13447,17 @@ def create(
S3ConfigNotFoundError: Raised when a configuration file is not found in S3
"""

# Validation for Lake Formation
if enable_lake_formation:
if offline_store_config == Unassigned() or offline_store_config is None:
raise ValueError(
"enable_lake_formation=True requires offline_store_config to be configured"
)
if role_arn == Unassigned() or role_arn is None:
raise ValueError(
"enable_lake_formation=True requires role_arn to be specified"
)

logger.info("Creating feature_group resource.")
client = Base.get_sagemaker_client(
session=session, region_name=region, service_name="sagemaker"
Expand Down Expand Up @@ -13148,7 +13490,14 @@ def create(
response = client.create_feature_group(**operation_input_args)
logger.debug(f"Response: {response}")

return cls.get(feature_group_name=feature_group_name, session=session, region=region)
feature_group = cls.get(feature_group_name=feature_group_name, session=session, region=region)

# Enable Lake Formation if requested
if enable_lake_formation:
feature_group.wait_for_status(target_status="Created")
feature_group.enable_lake_formation(session=session, region=region)

return feature_group

@classmethod
@Base.add_validate_call
Expand Down