Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 86 additions & 3 deletions scripts/convert_capec_map_to_asvs_map.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# This script converts the CAPEC mappings from webapp-mappings YAML files
# to a consolidated CAPEC-to-ASVS mapping file and a ASVS-to-CAPEC mapping file in source dir.
import json
import argparse
import logging
import sys
Expand All @@ -13,6 +14,10 @@
class ConvertVars:
TEMPLATE_FILE_NAME: str = "EDITION-TEMPLATE-VERSION.yaml"
DEFAULT_INPUT_PATH = Path(__file__).parent / "../source/webapp-mappings-3.0.yaml"
DEFAULT_ASVS_JSON_PATH = (
Path(__file__).parent
/ "../cornucopia.owasp.org/data/asvs-5.0/en/OWASP_Application_Security_Verification_Standard_5.0.0_en.json"
)
DEFAULT_OUTPUT_PATH = Path(__file__).parent / "../source"
args: argparse.Namespace

Expand Down Expand Up @@ -123,7 +128,10 @@ def _extract_and_add_asvs_requirements(


def convert_to_output_format(
capec_map: dict[Any, set[str]], parameter: str = "owasp_asvs", meta: dict[str, Any] | None = None
capec_map: dict[Any, set[str]],
parameter: str = "owasp_asvs",
meta: dict[str, Any] | None = None,
enrichment_data: dict[str, dict[str, str]] | None = None,
) -> Dict[str, Any]:
"""
Convert the internal mapping format to the output YAML format.
Expand All @@ -133,7 +141,13 @@ def convert_to_output_format(
output["meta"] = meta

for code, asvs_set in sorted(capec_map.items()):
output[code] = {parameter: sorted(list(asvs_set))}
entry: dict[str, Any] = {parameter: sorted(list(asvs_set))}

# Enrich with extra data if available (e.g., Description, L)
if enrichment_data and code in enrichment_data:
entry.update(enrichment_data[code])

output[code] = entry

return output

Expand All @@ -156,6 +170,24 @@ def load_yaml_file(filepath: Path) -> dict[str, Any]:
return {}


def load_json_file(filepath: Path) -> dict[str, Any]:
"""Load and parse a JSON file."""
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
logging.info("Successfully loaded JSON file: %s", filepath)
return data if data else {}
except FileNotFoundError:
logging.error("File not found: %s", filepath)
return {}
except json.JSONDecodeError as e:
logging.error("Error parsing JSON file %s: %s", filepath, str(e))
return {}
except Exception as e:
logging.error("Error loading JSON file %s: %s", filepath, str(e))
return {}


def save_yaml_file(filepath: Path, data: Dict[str, Any]) -> bool:
"""Save data as YAML file."""
try:
Expand Down Expand Up @@ -209,6 +241,12 @@ def parse_arguments(input_args: list[str]) -> argparse.Namespace:
default=ConvertVars.DEFAULT_OUTPUT_PATH,
help="Directory to save converted CAPEC-to-ASVS mapping YAML files",
)
parser.add_argument(
"--asvs-json",
type=validate_filepath_arg,
default=ConvertVars.DEFAULT_ASVS_JSON_PATH,
help="Path to ASVS JSON file for enrichment",
)
parser.add_argument(
"-d",
"--debug",
Expand All @@ -223,6 +261,38 @@ def parse_arguments(input_args: list[str]) -> argparse.Namespace:
return args


def extract_asvs_details(asvs_data: dict[str, Any]) -> dict[str, dict[str, str]]:
"""
Recursively walk the ASVS JSON structure to find requirements and extract their
Description and L (level).

Returns a dict mapping shortcode (e.g. '1.1.1') to details dict.
"""
details = {}

def _walk(node: Any) -> None:
if isinstance(node, dict):
# Check if this node is a requirement item
if "Shortcode" in node and "Description" in node and "L" in node:
# Remove leading 'V' from Shortcode if present
code = node["Shortcode"]
if code.startswith("V"):
code = code[1:]

details[code] = {"description": node["Description"], "level": node["L"]}

# Recurse into children
for key, value in node.items():
_walk(value)
elif isinstance(node, list):
for item in node:
_walk(item)

_walk(asvs_data)
logging.info("Extracted details for %d ASVS requirements", len(details))
return details


def main() -> None:
"""Main execution function."""
convert_vars.args = parse_arguments(sys.argv[1:])
Expand Down Expand Up @@ -260,6 +330,16 @@ def main() -> None:
logging.error("Failed to load input data or file is empty")
sys.exit(1)

# Load ASVS JSON for enrichment
asvs_details = {}
if convert_vars.args.asvs_json:
asvs_json_path = Path(convert_vars.args.asvs_json).resolve()
asvs_data = load_json_file(asvs_json_path)
if asvs_data:
asvs_details = extract_asvs_details(asvs_data)
else:
logging.warning("Failed to load ASVS JSON, skipping enrichment")

# Extract meta information
meta = data.get("meta", {}).copy()
if meta:
Expand All @@ -282,7 +362,10 @@ def main() -> None:
logging.info("Total CAPEC codes processed: %d", len(output_data) - (1 if meta else 0))

asvs_to_capec_map = extract_asvs_to_capec_mappings(data)
output_data_asvs = convert_to_output_format(asvs_to_capec_map, parameter="capec_codes", meta=meta)
# Pass enrichment data here
output_data_asvs = convert_to_output_format(
asvs_to_capec_map, parameter="capec_codes", meta=meta, enrichment_data=asvs_details
)
# Save output YAML
if not save_yaml_file(asvs_output_path, output_data_asvs):
logging.error("Failed to save asvs output file")
Expand Down
Loading
Loading