From 0d9910aec3cce792080371243ba1b70cd3199e64 Mon Sep 17 00:00:00 2001 From: Shree Date: Mon, 29 Dec 2025 13:49:44 +0530 Subject: [PATCH] [cli] Add dependency import command for multi-ecosystem support Implements `cortex import` command that parses and installs dependencies from requirements.txt, package.json, Gemfile, Cargo.toml, and go.mod files. Features: - Dry-run by default, --execute flag to install - --dev flag to include dev dependencies - --all flag to scan directory for all dependency files - Y/n confirmation for --all --execute - 90% test coverage with 80 unit tests Closes #126 --- cortex/cli.py | 277 ++++++++- cortex/dependency_importer.py | 911 ++++++++++++++++++++++++++++ docs/DEPENDENCY_IMPORT.md | 377 ++++++++++++ tests/test_dependency_importer.py | 949 ++++++++++++++++++++++++++++++ 4 files changed, 2513 insertions(+), 1 deletion(-) create mode 100644 cortex/dependency_importer.py create mode 100644 docs/DEPENDENCY_IMPORT.md create mode 100644 tests/test_dependency_importer.py diff --git a/cortex/cli.py b/cortex/cli.py index 274a4f55..b713b92f 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -8,8 +8,14 @@ from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner -from cortex.coordinator import InstallationCoordinator, StepStatus +from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus from cortex.demo import run_demo +from cortex.dependency_importer import ( + DependencyImporter, + PackageEcosystem, + ParseResult, + format_package_list, +) from cortex.env_manager import EnvironmentManager, get_env_manager from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter @@ -1064,6 +1070,243 @@ def _env_load(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> in return 0 + # --- Import Dependencies Command --- + def import_deps(self, args: argparse.Namespace) -> int: + """Import and install dependencies from package manager files. + + Supports: requirements.txt (Python), package.json (Node), + Gemfile (Ruby), Cargo.toml (Rust), go.mod (Go) + """ + file_path = getattr(args, "file", None) + scan_all = getattr(args, "all", False) + execute = getattr(args, "execute", False) + include_dev = getattr(args, "dev", False) + + importer = DependencyImporter() + + # Handle --all flag: scan directory for all dependency files + if scan_all: + return self._import_all(importer, execute, include_dev) + + # Handle single file import + if not file_path: + self._print_error("Please specify a dependency file or use --all to scan directory") + cx_print("Usage: cortex import [--execute] [--dev]", "info") + cx_print(" cortex import --all [--execute] [--dev]", "info") + return 1 + + return self._import_single_file(importer, file_path, execute, include_dev) + + def _import_single_file( + self, importer: DependencyImporter, file_path: str, execute: bool, include_dev: bool + ) -> int: + """Import dependencies from a single file.""" + result = importer.parse(file_path, include_dev=include_dev) + + # Display parsing results + self._display_parse_result(result, include_dev) + + if result.errors: + for error in result.errors: + self._print_error(error) + return 1 + + if not result.packages and not result.dev_packages: + cx_print("No packages found in file", "info") + return 0 + + # Get install command + install_cmd = importer.get_install_command(result.ecosystem, file_path) + if not install_cmd: + self._print_error(f"Unknown ecosystem: {result.ecosystem.value}") + return 1 + + # Dry run mode (default) + if not execute: + console.print(f"\n[bold]Install command:[/bold] {install_cmd}") + cx_print("\nTo install these packages, run with --execute flag", "info") + cx_print(f"Example: cortex import {file_path} --execute", "info") + return 0 + + # Execute mode - run the install command + return self._execute_install(install_cmd, result.ecosystem) + + def _import_all(self, importer: DependencyImporter, execute: bool, include_dev: bool) -> int: + """Scan directory and import all dependency files.""" + cx_print("Scanning directory...", "info") + + results = importer.scan_directory(include_dev=include_dev) + + if not results: + cx_print("No dependency files found in current directory", "info") + return 0 + + # Display all found files + total_packages = 0 + total_dev_packages = 0 + + for file_path, result in results.items(): + filename = os.path.basename(file_path) + if result.errors: + console.print(f" [red]✗[/red] {filename} (error: {result.errors[0]})") + else: + pkg_count = result.prod_count + dev_count = result.dev_count if include_dev else 0 + total_packages += pkg_count + total_dev_packages += dev_count + dev_str = f" + {dev_count} dev" if dev_count > 0 else "" + console.print(f" [green]✓[/green] {filename} ({pkg_count} packages{dev_str})") + + console.print() + + if total_packages == 0 and total_dev_packages == 0: + cx_print("No packages found in dependency files", "info") + return 0 + + # Generate install commands + commands = importer.get_install_commands_for_results(results) + + if not commands: + cx_print("No install commands generated", "info") + return 0 + + # Dry run mode (default) + if not execute: + console.print("[bold]Install commands:[/bold]") + for cmd_info in commands: + console.print(f" • {cmd_info['command']}") + console.print() + cx_print("To install all packages, run with --execute flag", "info") + cx_print("Example: cortex import --all --execute", "info") + return 0 + + # Execute mode - confirm before installing + total = total_packages + total_dev_packages + confirm = input(f"\nInstall all {total} packages? [Y/n]: ") + if confirm.lower() not in ["", "y", "yes"]: + cx_print("Installation cancelled", "info") + return 0 + + # Execute all install commands + return self._execute_multi_install(commands) + + def _display_parse_result(self, result: ParseResult, include_dev: bool) -> None: + """Display the parsed packages from a dependency file.""" + ecosystem_names = { + PackageEcosystem.PYTHON: "Python", + PackageEcosystem.NODE: "Node", + PackageEcosystem.RUBY: "Ruby", + PackageEcosystem.RUST: "Rust", + PackageEcosystem.GO: "Go", + } + + ecosystem_name = ecosystem_names.get(result.ecosystem, "Unknown") + filename = os.path.basename(result.file_path) + + cx_print(f"\n📋 Found {result.prod_count} {ecosystem_name} packages", "info") + + if result.packages: + console.print("\n[bold]Packages:[/bold]") + for pkg in result.packages[:15]: # Show first 15 + version_str = f" ({pkg.version})" if pkg.version else "" + console.print(f" • {pkg.name}{version_str}") + if len(result.packages) > 15: + console.print(f" [dim]... and {len(result.packages) - 15} more[/dim]") + + if include_dev and result.dev_packages: + console.print(f"\n[bold]Dev packages:[/bold] ({result.dev_count})") + for pkg in result.dev_packages[:10]: + version_str = f" ({pkg.version})" if pkg.version else "" + console.print(f" • {pkg.name}{version_str}") + if len(result.dev_packages) > 10: + console.print(f" [dim]... and {len(result.dev_packages) - 10} more[/dim]") + + if result.warnings: + console.print() + for warning in result.warnings: + cx_print(f"⚠ {warning}", "warning") + + def _execute_install(self, command: str, ecosystem: PackageEcosystem) -> int: + """Execute a single install command.""" + ecosystem_names = { + PackageEcosystem.PYTHON: "Python", + PackageEcosystem.NODE: "Node", + PackageEcosystem.RUBY: "Ruby", + PackageEcosystem.RUST: "Rust", + PackageEcosystem.GO: "Go", + } + + ecosystem_name = ecosystem_names.get(ecosystem, "") + cx_print(f"\n✓ Installing {ecosystem_name} packages...", "success") + + def progress_callback(current: int, total: int, step: InstallationStep) -> None: + status_emoji = "⏳" + if step.status == StepStatus.SUCCESS: + status_emoji = "✅" + elif step.status == StepStatus.FAILED: + status_emoji = "❌" + console.print(f"[{current}/{total}] {status_emoji} {step.description}") + + coordinator = InstallationCoordinator( + commands=[command], + descriptions=[f"Install {ecosystem_name} packages"], + timeout=600, # 10 minutes for package installation + stop_on_error=True, + progress_callback=progress_callback, + ) + + result = coordinator.execute() + + if result.success: + self._print_success(f"{ecosystem_name} packages installed successfully!") + console.print(f"Completed in {result.total_duration:.2f} seconds") + return 0 + else: + self._print_error("Installation failed") + if result.error_message: + console.print(f"Error: {result.error_message}", style="red") + return 1 + + def _execute_multi_install(self, commands: list[dict[str, str]]) -> int: + """Execute multiple install commands.""" + all_commands = [cmd["command"] for cmd in commands] + all_descriptions = [cmd["description"] for cmd in commands] + + def progress_callback(current: int, total: int, step: InstallationStep) -> None: + status_emoji = "⏳" + if step.status == StepStatus.SUCCESS: + status_emoji = "✅" + elif step.status == StepStatus.FAILED: + status_emoji = "❌" + console.print(f"\n[{current}/{total}] {status_emoji} {step.description}") + console.print(f" Command: {step.command}") + + coordinator = InstallationCoordinator( + commands=all_commands, + descriptions=all_descriptions, + timeout=600, + stop_on_error=True, + progress_callback=progress_callback, + ) + + console.print("\n[bold]Installing packages...[/bold]") + result = coordinator.execute() + + if result.success: + self._print_success("\nAll packages installed successfully!") + console.print(f"Completed in {result.total_duration:.2f} seconds") + return 0 + else: + if result.failed_step is not None: + self._print_error(f"\nInstallation failed at step {result.failed_step + 1}") + else: + self._print_error("\nInstallation failed") + if result.error_message: + console.print(f"Error: {result.error_message}", style="red") + return 1 + + # -------------------------- + def show_rich_help(): """Display beautifully formatted help using Rich""" @@ -1086,6 +1329,7 @@ def show_rich_help(): table.add_row("wizard", "Configure API key") table.add_row("status", "System status") table.add_row("install ", "Install software") + table.add_row("import ", "Import deps from package files") table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") table.add_row("notify", "Manage desktop notifications") @@ -1184,6 +1428,35 @@ def main(): help="Enable parallel execution for multi-step installs", ) + # Import command - import dependencies from package manager files + import_parser = subparsers.add_parser( + "import", + help="Import and install dependencies from package files", + ) + import_parser.add_argument( + "file", + nargs="?", + help="Dependency file (requirements.txt, package.json, Gemfile, Cargo.toml, go.mod)", + ) + import_parser.add_argument( + "--all", + "-a", + action="store_true", + help="Scan directory for all dependency files", + ) + import_parser.add_argument( + "--execute", + "-e", + action="store_true", + help="Execute install commands (default: dry-run)", + ) + import_parser.add_argument( + "--dev", + "-d", + action="store_true", + help="Include dev dependencies", + ) + # History command history_parser = subparsers.add_parser("history", help="View history") history_parser.add_argument("--limit", type=int, default=20) @@ -1347,6 +1620,8 @@ def main(): dry_run=args.dry_run, parallel=args.parallel, ) + elif args.command == "import": + return cli.import_deps(args) elif args.command == "history": return cli.history(limit=args.limit, status=args.status, show_id=args.show_id) elif args.command == "rollback": diff --git a/cortex/dependency_importer.py b/cortex/dependency_importer.py new file mode 100644 index 00000000..91a7ca34 --- /dev/null +++ b/cortex/dependency_importer.py @@ -0,0 +1,911 @@ +#!/usr/bin/env python3 +""" +Dependency Importer Module + +Parses dependency files from multiple ecosystems and provides unified installation. +Supports: requirements.txt (Python), package.json (Node), Gemfile (Ruby), + Cargo.toml (Rust), go.mod (Go) +""" + +import json +import os +import re +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path + + +class PackageEcosystem(Enum): + """Supported package ecosystems.""" + + PYTHON = "python" + NODE = "node" + RUBY = "ruby" + RUST = "rust" + GO = "go" + UNKNOWN = "unknown" + + +@dataclass +class Package: + """Represents a parsed package dependency.""" + + name: str + version: str | None = None + ecosystem: PackageEcosystem = PackageEcosystem.UNKNOWN + is_dev: bool = False + extras: list[str] = field(default_factory=list) + source: str | None = None # git URL, path, etc. + group: str | None = None # For Gemfile groups + features: list[str] = field(default_factory=list) # For Cargo.toml + is_indirect: bool = False # For go.mod indirect deps + is_optional: bool = False + + def __str__(self) -> str: + version_str = f"@{self.version}" if self.version else "" + return f"{self.name}{version_str}" + + +@dataclass +class ParseResult: + """Result of parsing a dependency file.""" + + file_path: str + ecosystem: PackageEcosystem + packages: list[Package] + dev_packages: list[Package] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + + @property + def total_count(self) -> int: + """Total number of packages (prod + dev).""" + return len(self.packages) + len(self.dev_packages) + + @property + def prod_count(self) -> int: + """Number of production packages.""" + return len(self.packages) + + @property + def dev_count(self) -> int: + """Number of dev packages.""" + return len(self.dev_packages) + + +# Mapping of filenames to ecosystems +DEPENDENCY_FILES = { + "requirements.txt": PackageEcosystem.PYTHON, + "requirements-dev.txt": PackageEcosystem.PYTHON, + "requirements-test.txt": PackageEcosystem.PYTHON, + "requirements_dev.txt": PackageEcosystem.PYTHON, + "requirements_test.txt": PackageEcosystem.PYTHON, + "dev-requirements.txt": PackageEcosystem.PYTHON, + "test-requirements.txt": PackageEcosystem.PYTHON, + "package.json": PackageEcosystem.NODE, + "Gemfile": PackageEcosystem.RUBY, + "Cargo.toml": PackageEcosystem.RUST, + "go.mod": PackageEcosystem.GO, +} + +# Install commands for each ecosystem +INSTALL_COMMANDS = { + PackageEcosystem.PYTHON: "pip install -r {file}", + PackageEcosystem.NODE: "npm install", + PackageEcosystem.RUBY: "bundle install", + PackageEcosystem.RUST: "cargo build", + PackageEcosystem.GO: "go mod download", +} + + +class DependencyImporter: + """Parses and imports dependencies from various package manager files.""" + + def __init__(self, base_path: str | None = None): + """Initialize the importer. + + Args: + base_path: Base directory for resolving relative paths. + Defaults to current working directory. + """ + self.base_path = Path(base_path) if base_path else Path.cwd() + self._visited_files: set[str] = set() # Track visited files for -r includes + + def detect_ecosystem(self, file_path: str) -> PackageEcosystem: + """Detect the ecosystem based on filename. + + Args: + file_path: Path to the dependency file. + + Returns: + The detected PackageEcosystem or UNKNOWN. + """ + filename = os.path.basename(file_path) + + # Exact match + if filename in DEPENDENCY_FILES: + return DEPENDENCY_FILES[filename] + + # Pattern matching for requirements*.txt + if filename.startswith("requirements") and filename.endswith(".txt"): + return PackageEcosystem.PYTHON + + return PackageEcosystem.UNKNOWN + + def parse(self, file_path: str, include_dev: bool = False) -> ParseResult: + """Parse a dependency file and extract packages. + + Args: + file_path: Path to the dependency file. + include_dev: Whether to include dev dependencies. + + Returns: + ParseResult containing packages and any errors. + """ + path = Path(file_path) + if not path.is_absolute(): + path = self.base_path / path + + ecosystem = self.detect_ecosystem(str(path)) + + if not path.exists(): + return ParseResult( + file_path=str(path), + ecosystem=ecosystem, + packages=[], + errors=[f"File not found: {path}"], + ) + + try: + if ecosystem == PackageEcosystem.PYTHON: + return self._parse_requirements_txt(path, include_dev) + elif ecosystem == PackageEcosystem.NODE: + return self._parse_package_json(path, include_dev) + elif ecosystem == PackageEcosystem.RUBY: + return self._parse_gemfile(path, include_dev) + elif ecosystem == PackageEcosystem.RUST: + return self._parse_cargo_toml(path, include_dev) + elif ecosystem == PackageEcosystem.GO: + return self._parse_go_mod(path, include_dev) + else: + return ParseResult( + file_path=str(path), + ecosystem=ecosystem, + packages=[], + errors=[f"Unknown file type: {path.name}"], + ) + except Exception as e: + return ParseResult( + file_path=str(path), + ecosystem=ecosystem, + packages=[], + errors=[f"Parse error: {str(e)}"], + ) + + def _parse_requirements_txt(self, path: Path, include_dev: bool = False) -> ParseResult: + """Parse Python requirements.txt file. + + Handles: + - Package names with version specifiers (==, >=, <=, ~=, !=, <, >) + - Comments (#) + - Extras (package[extra1,extra2]) + - -r includes (recursive file imports) + - Environment markers (; python_version >= "3.8") + - Git URLs and editable installs (-e) + """ + packages: list[Package] = [] + dev_packages: list[Package] = [] + errors: list[str] = [] + warnings: list[str] = [] + + # Prevent circular includes + abs_path = str(path.resolve()) + if abs_path in self._visited_files: + warnings.append(f"Skipping circular include: {path}") + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.PYTHON, + packages=[], + warnings=warnings, + ) + self._visited_files.add(abs_path) + + # Detect if this is a dev requirements file + is_dev_file = any(x in path.name.lower() for x in ["dev", "test", "development", "testing"]) + + try: + content = path.read_text(encoding="utf-8") + except UnicodeDecodeError: + content = path.read_text(encoding="latin-1") + + for line_num, line in enumerate(content.splitlines(), 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith("#"): + continue + + # Handle -r includes + if line.startswith("-r ") or line.startswith("--requirement "): + include_path = line.split(maxsplit=1)[1].strip() + include_full_path = path.parent / include_path + if include_full_path.exists(): + sub_result = self._parse_requirements_txt(include_full_path, include_dev) + packages.extend(sub_result.packages) + dev_packages.extend(sub_result.dev_packages) + errors.extend(sub_result.errors) + warnings.extend(sub_result.warnings) + else: + warnings.append(f"Line {line_num}: Include file not found: {include_path}") + continue + + # Skip options like --index-url, --extra-index-url, --trusted-host + if line.startswith("-") and not line.startswith("-e"): + continue + + # Handle editable installs (-e) + if line.startswith("-e ") or line.startswith("--editable "): + source = line.split(maxsplit=1)[1].strip() + # Try to extract package name from path or URL + pkg_name = self._extract_name_from_source(source) + if pkg_name: + pkg = Package( + name=pkg_name, + ecosystem=PackageEcosystem.PYTHON, + source=source, + is_dev=is_dev_file, + ) + if is_dev_file: + dev_packages.append(pkg) + else: + packages.append(pkg) + else: + warnings.append(f"Line {line_num}: Could not parse editable install: {line}") + continue + + # Handle git URLs without -e + if line.startswith(("git+", "hg+", "svn+", "bzr+")): + pkg_name = self._extract_name_from_source(line) + if pkg_name: + pkg = Package( + name=pkg_name, + ecosystem=PackageEcosystem.PYTHON, + source=line, + is_dev=is_dev_file, + ) + if is_dev_file: + dev_packages.append(pkg) + else: + packages.append(pkg) + continue + + # Parse standard package specifier + pkg = self._parse_python_requirement(line, is_dev_file) + if pkg: + if is_dev_file: + dev_packages.append(pkg) + else: + packages.append(pkg) + + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.PYTHON, + packages=packages, + dev_packages=dev_packages if include_dev else [], + errors=errors, + warnings=warnings, + ) + + def _parse_python_requirement(self, line: str, is_dev: bool = False) -> Package | None: + """Parse a single Python requirement line. + + Examples: + - requests + - requests==2.28.0 + - requests>=2.20,<3.0 + - requests[security,socks]>=2.20 + - requests; python_version >= "3.8" + """ + # Remove environment markers + if ";" in line: + line = line.split(";")[0].strip() + + # Match package name with optional extras and version + # Pattern: name[extras]version_spec + pattern = r"^([a-zA-Z0-9][-a-zA-Z0-9._]*)(\[[^\]]+\])?\s*(.*)$" + match = re.match(pattern, line) + + if not match: + return None + + name = match.group(1) + extras_str = match.group(2) + version_spec = match.group(3).strip() if match.group(3) else None + + extras: list[str] = [] + if extras_str: + # Remove brackets and split by comma + extras = [e.strip() for e in extras_str[1:-1].split(",")] + + # Clean version spec (remove comparison operators for display) + version = None + if version_spec: + # Extract version number from spec like "==2.0.0" or ">=1.0,<2.0" + version_match = re.search(r"[=<>!~]+\s*([0-9][0-9a-zA-Z._-]*)", version_spec) + if version_match: + version = version_spec # Keep full spec for accuracy + + return Package( + name=name, + version=version, + ecosystem=PackageEcosystem.PYTHON, + extras=extras, + is_dev=is_dev, + ) + + def _extract_name_from_source(self, source: str) -> str | None: + """Extract package name from git URL or path.""" + # Handle egg= fragment + if "#egg=" in source: + return source.split("#egg=")[1].split("&")[0] + + # Handle git URLs + if "github.com" in source or "gitlab.com" in source or "bitbucket.org" in source: + # Extract repo name from URL + match = re.search(r"/([^/]+?)(?:\.git)?(?:@|#|$)", source) + if match: + return match.group(1) + + # Handle local paths + if source.startswith("./") or source.startswith("../") or source.startswith("/"): + return os.path.basename(source.rstrip("/")) + + return None + + def _parse_package_json(self, path: Path, include_dev: bool = False) -> ParseResult: + """Parse Node.js package.json file. + + Handles: + - dependencies + - devDependencies + - peerDependencies (as warnings) + - optionalDependencies + - Scoped packages (@scope/name) + - Version ranges (^, ~, >=, *, latest) + - Git URLs and local paths + """ + packages: list[Package] = [] + dev_packages: list[Package] = [] + errors: list[str] = [] + warnings: list[str] = [] + + try: + content = path.read_text(encoding="utf-8") + data = json.loads(content) + except json.JSONDecodeError as e: + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.NODE, + packages=[], + errors=[f"Invalid JSON: {str(e)}"], + ) + except Exception as e: + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.NODE, + packages=[], + errors=[f"Read error: {str(e)}"], + ) + + # Parse production dependencies + deps = data.get("dependencies", {}) + for name, version in deps.items(): + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.NODE, + is_dev=False, + ) + # Check if it's a git or file URL + if version.startswith(("git", "github:", "gitlab:", "file:")): + pkg.source = version + packages.append(pkg) + + # Parse dev dependencies + dev_deps = data.get("devDependencies", {}) + for name, version in dev_deps.items(): + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.NODE, + is_dev=True, + ) + if version.startswith(("git", "github:", "gitlab:", "file:")): + pkg.source = version + dev_packages.append(pkg) + + # Warn about peer dependencies + peer_deps = data.get("peerDependencies", {}) + if peer_deps: + warnings.append( + f"Found {len(peer_deps)} peer dependencies (not auto-installed): " + f"{', '.join(list(peer_deps.keys())[:3])}..." + ) + + # Parse optional dependencies + optional_deps = data.get("optionalDependencies", {}) + for name, version in optional_deps.items(): + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.NODE, + is_dev=False, + is_optional=True, + ) + packages.append(pkg) + + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.NODE, + packages=packages, + dev_packages=dev_packages if include_dev else [], + errors=errors, + warnings=warnings, + ) + + def _parse_gemfile(self, path: Path, include_dev: bool = False) -> ParseResult: + """Parse Ruby Gemfile. + + Handles: + - gem declarations with versions + - Groups (:development, :test, :production) + - Path-based gems (path: './local') + - Git-based gems (git: 'https://...') + - Source declarations + """ + packages: list[Package] = [] + dev_packages: list[Package] = [] + errors: list[str] = [] + warnings: list[str] = [] + + try: + content = path.read_text(encoding="utf-8") + except Exception as e: + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.RUBY, + packages=[], + errors=[f"Read error: {str(e)}"], + ) + + current_groups: list[str] = [] + in_group_block = False + + for line_num, line in enumerate(content.splitlines(), 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith("#"): + continue + + # Handle source declarations + if line.startswith("source "): + continue + + # Handle ruby version + if line.startswith("ruby "): + continue + + # Handle group blocks + group_match = re.match(r"group\s+(.+?)\s+do", line) + if group_match: + groups_str = group_match.group(1) + # Parse group symbols like :development, :test + current_groups = re.findall(r":(\w+)", groups_str) + in_group_block = True + continue + + if line == "end" and in_group_block: + current_groups = [] + in_group_block = False + continue + + # Parse gem declarations + gem_match = re.match(r"gem\s+['\"]([^'\"]+)['\"](.*)$", line) + if gem_match: + name = gem_match.group(1) + rest = gem_match.group(2).strip() if gem_match.group(2) else "" + + version = None + source = None + groups = current_groups.copy() + + # Extract version if present (e.g., gem 'rails', '~> 7.0') + version_match = re.search(r",\s*['\"]([^'\"]+)['\"]", rest) + if version_match: + version = version_match.group(1) + + # Extract path or git source + path_match = re.search(r"path:\s*['\"]([^'\"]+)['\"]", rest) + if path_match: + source = f"path:{path_match.group(1)}" + + git_match = re.search(r"git:\s*['\"]([^'\"]+)['\"]", rest) + if git_match: + source = f"git:{git_match.group(1)}" + + # Extract inline group + inline_group = re.search(r"group:\s*\[?([^\]]+)\]?", rest) + if inline_group: + groups.extend(re.findall(r":(\w+)", inline_group.group(1))) + + # Determine if dev dependency + is_dev = any(g in ["development", "test", "dev"] for g in groups) + + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.RUBY, + is_dev=is_dev, + source=source, + group=",".join(groups) if groups else None, + ) + + if is_dev: + dev_packages.append(pkg) + else: + packages.append(pkg) + + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.RUBY, + packages=packages, + dev_packages=dev_packages if include_dev else [], + errors=errors, + warnings=warnings, + ) + + def _parse_cargo_toml(self, path: Path, include_dev: bool = False) -> ParseResult: + """Parse Rust Cargo.toml file. + + Handles: + - [dependencies] + - [dev-dependencies] + - [build-dependencies] + - Inline tables { version = "1.0", features = ["full"] } + - Path and git dependencies + - Optional dependencies + """ + packages: list[Package] = [] + dev_packages: list[Package] = [] + errors: list[str] = [] + warnings: list[str] = [] + + try: + content = path.read_text(encoding="utf-8") + except Exception as e: + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.RUST, + packages=[], + errors=[f"Read error: {str(e)}"], + ) + + # Simple TOML parsing (without external library) + current_section = "" + current_is_dev = False + + for line_num, line in enumerate(content.splitlines(), 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith("#"): + continue + + # Section headers + if line.startswith("[") and line.endswith("]"): + current_section = line[1:-1].lower() + current_is_dev = current_section in [ + "dev-dependencies", + "build-dependencies", + ] + continue + + # Skip non-dependency sections + if current_section not in [ + "dependencies", + "dev-dependencies", + "build-dependencies", + ]: + # Check for target-specific dependencies + if ".dependencies" in current_section: + pass # Process it + else: + continue + + # Parse dependency line + if "=" not in line: + continue + + parts = line.split("=", 1) + name = parts[0].strip() + value = parts[1].strip() + + version = None + features: list[str] = [] + source = None + is_optional = False + + # Simple string version: serde = "1.0" + if value.startswith('"') and value.endswith('"'): + version = value[1:-1] + # Inline table: tokio = { version = "1", features = ["full"] } + elif value.startswith("{"): + # Extract version + ver_match = re.search(r'version\s*=\s*"([^"]+)"', value) + if ver_match: + version = ver_match.group(1) + + # Extract features + feat_match = re.search(r"features\s*=\s*\[([^\]]+)\]", value) + if feat_match: + features = [f.strip().strip('"') for f in feat_match.group(1).split(",")] + + # Extract path + path_match = re.search(r'path\s*=\s*"([^"]+)"', value) + if path_match: + source = f"path:{path_match.group(1)}" + + # Extract git + git_match = re.search(r'git\s*=\s*"([^"]+)"', value) + if git_match: + source = f"git:{git_match.group(1)}" + + # Check optional + if "optional = true" in value.lower(): + is_optional = True + + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.RUST, + is_dev=current_is_dev, + features=features, + source=source, + is_optional=is_optional, + ) + + if current_is_dev: + dev_packages.append(pkg) + else: + packages.append(pkg) + + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.RUST, + packages=packages, + dev_packages=dev_packages if include_dev else [], + errors=errors, + warnings=warnings, + ) + + def _parse_go_mod(self, path: Path, include_dev: bool = False) -> ParseResult: + """Parse Go go.mod file. + + Handles: + - require statements (single and block) + - // indirect comments + - replace directives (as warnings) + - exclude directives (as warnings) + - Go version requirements + """ + packages: list[Package] = [] + errors: list[str] = [] + warnings: list[str] = [] + + try: + content = path.read_text(encoding="utf-8") + except Exception as e: + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.GO, + packages=[], + errors=[f"Read error: {str(e)}"], + ) + + in_require_block = False + replace_count = 0 + exclude_count = 0 + + for line_num, line in enumerate(content.splitlines(), 1): + original_line = line + line = line.strip() + + # Skip empty lines and pure comments + if not line or (line.startswith("//") and "indirect" not in line): + continue + + # Module declaration + if line.startswith("module "): + continue + + # Go version + if line.startswith("go "): + continue + + # Replace directives + if line.startswith("replace "): + replace_count += 1 + continue + + # Exclude directives + if line.startswith("exclude "): + exclude_count += 1 + continue + + # Require block start + if line.startswith("require ("): + in_require_block = True + continue + + # Block end + if line == ")": + in_require_block = False + continue + + # Single require statement + if line.startswith("require "): + parts = line[8:].strip().split() + if len(parts) >= 2: + name = parts[0] + version = parts[1] + is_indirect = "// indirect" in original_line + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.GO, + is_indirect=is_indirect, + ) + packages.append(pkg) + continue + + # Dependencies inside require block + if in_require_block: + parts = line.split() + if len(parts) >= 2: + name = parts[0] + version = parts[1] + is_indirect = "// indirect" in original_line + pkg = Package( + name=name, + version=version, + ecosystem=PackageEcosystem.GO, + is_indirect=is_indirect, + ) + packages.append(pkg) + + # Add warnings for replace/exclude + if replace_count > 0: + warnings.append(f"Found {replace_count} replace directive(s)") + if exclude_count > 0: + warnings.append(f"Found {exclude_count} exclude directive(s)") + + return ParseResult( + file_path=str(path), + ecosystem=PackageEcosystem.GO, + packages=packages, + dev_packages=[], # Go doesn't distinguish dev deps in go.mod + errors=errors, + warnings=warnings, + ) + + def scan_directory( + self, directory: str | None = None, include_dev: bool = False + ) -> dict[str, ParseResult]: + """Scan a directory for all supported dependency files. + + Args: + directory: Directory to scan. Defaults to base_path. + include_dev: Whether to include dev dependencies. + + Returns: + Dict mapping file paths to their ParseResults. + """ + scan_path = Path(directory) if directory else self.base_path + results: dict[str, ParseResult] = {} + + for filename in DEPENDENCY_FILES: + file_path = scan_path / filename + if file_path.exists(): + result = self.parse(str(file_path), include_dev) + if result.packages or result.dev_packages or result.errors: + results[str(file_path)] = result + + return results + + def get_install_command( + self, ecosystem: PackageEcosystem, file_path: str | None = None + ) -> str | None: + """Get the appropriate install command for an ecosystem. + + Args: + ecosystem: The package ecosystem. + file_path: Optional file path to include in command. + + Returns: + Install command string or None if unknown ecosystem. + """ + if ecosystem not in INSTALL_COMMANDS: + return None + + cmd = INSTALL_COMMANDS[ecosystem] + if "{file}" in cmd and file_path: + return cmd.format(file=file_path) + return cmd + + def get_install_commands_for_results( + self, results: dict[str, ParseResult] + ) -> list[dict[str, str]]: + """Generate install commands for multiple parse results. + + Args: + results: Dict of file paths to ParseResults. + + Returns: + List of dicts with 'command' and 'description' keys. + """ + commands: list[dict[str, str]] = [] + seen_ecosystems: set[PackageEcosystem] = set() + + for file_path, result in results.items(): + if result.errors: + continue + + ecosystem = result.ecosystem + + # For Python, we use pip install -r for each file + if ecosystem == PackageEcosystem.PYTHON: + if result.packages or result.dev_packages: + cmd = self.get_install_command(ecosystem, file_path) + if cmd: + commands.append( + { + "command": cmd, + "description": f"Install Python packages from {os.path.basename(file_path)}", + } + ) + # For other ecosystems, one command per ecosystem + elif ecosystem not in seen_ecosystems: + cmd = self.get_install_command(ecosystem) + if cmd and (result.packages or result.dev_packages): + commands.append( + { + "command": cmd, + "description": f"Install {ecosystem.value.title()} packages", + } + ) + seen_ecosystems.add(ecosystem) + + return commands + + +def format_package_list(packages: list[Package], max_display: int = 10) -> str: + """Format a list of packages for display. + + Args: + packages: List of Package objects. + max_display: Maximum number to show before truncating. + + Returns: + Formatted string for display. + """ + if not packages: + return "(none)" + + names = [str(pkg) for pkg in packages[:max_display]] + result = ", ".join(names) + + if len(packages) > max_display: + result += f" (+{len(packages) - max_display} more)" + + return result diff --git a/docs/DEPENDENCY_IMPORT.md b/docs/DEPENDENCY_IMPORT.md new file mode 100644 index 00000000..609c19ba --- /dev/null +++ b/docs/DEPENDENCY_IMPORT.md @@ -0,0 +1,377 @@ +# Dependency Import + +Import and install packages from dependency files across multiple ecosystems. + +## Overview + +The `cortex import` command parses dependency files from various package managers and installs the packages. It supports: + +- **Python**: `requirements.txt` +- **Node.js**: `package.json` +- **Ruby**: `Gemfile` +- **Rust**: `Cargo.toml` +- **Go**: `go.mod` + +## Usage + +### Basic Usage + +```bash +# Parse and show packages (dry-run by default) +cortex import requirements.txt + +# Actually install packages +cortex import requirements.txt --execute + +# Include dev dependencies +cortex import package.json --dev + +# Scan directory for all dependency files +cortex import --all + +# Scan and install all +cortex import --all --execute +``` + +### Command Options + +| Option | Short | Description | +|--------|-------|-------------| +| `--execute` | `-e` | Execute install commands (default: dry-run) | +| `--dev` | `-d` | Include dev dependencies | +| `--all` | `-a` | Scan directory for all dependency files | + +## Supported Formats + +### Python (requirements.txt) + +Parses standard pip requirements format: + +```txt +# Comments are ignored +requests==2.28.0 +flask>=2.0.0 +django~=4.0 +numpy<2.0,>=1.5 + +# Extras +requests[security,socks] + +# Git URLs +git+https://github.com/user/repo.git#egg=mypackage + +# Editable installs +-e ./local_package + +# Include other files +-r base-requirements.txt +``` + +**Features:** +- Version specifiers (`==`, `>=`, `<=`, `~=`, `!=`, `<`, `>`) +- Extras (`package[extra1,extra2]`) +- Environment markers (`package; python_version >= "3.8"`) +- Recursive includes (`-r other-file.txt`) +- Git URLs and editable installs +- Comments and blank lines + +**Install Command:** `pip install -r ` + +### Node.js (package.json) + +Parses npm/yarn package files: + +```json +{ + "dependencies": { + "express": "^4.18.0", + "@types/node": "^18.0.0" + }, + "devDependencies": { + "jest": "^29.0.0", + "typescript": "^5.0.0" + }, + "optionalDependencies": { + "fsevents": "^2.3.0" + } +} +``` + +**Features:** +- Production dependencies +- Dev dependencies (with `--dev` flag) +- Optional dependencies +- Scoped packages (`@scope/package`) +- Version ranges (`^`, `~`, `>=`, `*`, `latest`) +- Git URLs and local paths +- Peer dependencies (shown as warning) + +**Install Command:** `npm install` + +### Ruby (Gemfile) + +Parses Bundler Gemfiles: + +```ruby +source 'https://rubygems.org' + +ruby '3.2.0' + +gem 'rails', '~> 7.0' +gem 'pg', '>= 1.0' + +group :development, :test do + gem 'rspec' + gem 'factory_bot' +end + +gem 'rubocop', group: :development +gem 'local_gem', path: './gems/local' +gem 'git_gem', git: 'https://github.com/user/repo.git' +``` + +**Features:** +- Gem declarations with versions +- Groups (`:development`, `:test`, `:production`) +- Inline group syntax +- Path-based gems +- Git-based gems +- Source declarations +- Ruby version requirements (ignored) + +**Install Command:** `bundle install` + +### Rust (Cargo.toml) + +Parses Cargo manifest files: + +```toml +[package] +name = "my_project" +version = "0.1.0" + +[dependencies] +serde = "1.0" +tokio = { version = "1.0", features = ["full"] } +local_crate = { path = "../local" } +git_crate = { git = "https://github.com/user/repo.git" } + +[dev-dependencies] +criterion = "0.4" + +[build-dependencies] +cc = "1.0" +``` + +**Features:** +- Simple version strings +- Inline tables with version and features +- Path dependencies +- Git dependencies +- Optional dependencies +- Dev dependencies (with `--dev` flag) +- Build dependencies (treated as dev) + +**Install Command:** `cargo build` + +### Go (go.mod) + +Parses Go module files: + +```go +module example.com/mymodule + +go 1.21 + +require ( + github.com/gin-gonic/gin v1.9.0 + github.com/go-redis/redis/v8 v8.11.5 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 +) + +require ( + github.com/indirect/dep v1.0.0 // indirect +) + +replace github.com/old => github.com/new v1.0.0 +``` + +**Features:** +- Single and block require statements +- Indirect dependencies (marked with `// indirect`) +- Pseudo-versions (commit hashes) +- Replace directives (shown as warning) +- Exclude directives (shown as warning) + +**Install Command:** `go mod download` + +## Examples + +### Single File Import + +```bash +$ cortex import requirements.txt + +📋 Found 25 Python packages + +Packages: + • requests (==2.28.0) + • flask (>=2.0.0) + • django (~=4.0) + ... and 22 more + +Install command: pip install -r requirements.txt + +To install these packages, run with --execute flag +Example: cortex import requirements.txt --execute +``` + +### With Dev Dependencies + +```bash +$ cortex import package.json --dev + +📋 Found 12 Node packages + +Packages: + • express (^4.18.0) + • lodash (~4.17.21) + ... and 3 more + +Dev packages: (7) + • jest (^29.0.0) + • typescript (^5.0.0) + ... and 5 more + +Install command: npm install + +To install these packages, run with --execute flag +Example: cortex import package.json --execute +``` + +### Scan All Dependencies + +```bash +$ cortex import --all +Scanning directory... + ✓ requirements.txt (25 packages) + ✓ package.json (42 packages) + ✓ Gemfile (8 packages) + +Install commands: + • pip install -r requirements.txt + • npm install + • bundle install + +To install all packages, run with --execute flag +Example: cortex import --all --execute +``` + +### Execute Installation + +```bash +$ cortex import --all --execute +Scanning directory... + ✓ requirements.txt (25 packages) + ✓ package.json (42 packages) + +Install all 67 packages? [Y/n]: y + +Installing packages... + +[1/2] ✅ Install Python packages from requirements.txt + Command: pip install -r requirements.txt + +[2/2] ✅ Install Node packages + Command: npm install + +All packages installed successfully! +Completed in 45.32 seconds +``` + +## Error Handling + +### File Not Found + +```bash +$ cortex import nonexistent.txt +Error: File not found: nonexistent.txt +``` + +### Invalid JSON + +```bash +$ cortex import package.json +Error: Invalid JSON: Expecting property name enclosed in double quotes +``` + +### Unknown File Type + +```bash +$ cortex import unknown.xyz +Error: Unknown file type: unknown.xyz +``` + +## Programmatic Usage + +The `DependencyImporter` class can be used programmatically: + +```python +from cortex.dependency_importer import DependencyImporter, PackageEcosystem + +# Create importer +importer = DependencyImporter() + +# Parse a single file +result = importer.parse("requirements.txt", include_dev=True) + +print(f"Found {result.prod_count} packages") +for pkg in result.packages: + print(f" - {pkg.name} ({pkg.version})") + +# Scan directory +results = importer.scan_directory(include_dev=True) +for file_path, result in results.items(): + print(f"{file_path}: {result.total_count} packages") + +# Get install commands +commands = importer.get_install_commands_for_results(results) +for cmd in commands: + print(f" {cmd['command']}") +``` + +## Dev Dependencies + +By default, dev dependencies are **not** included. Use the `--dev` flag to include them: + +| Ecosystem | Dev Dependencies | +|-----------|-----------------| +| Python | Files named `*dev*`, `*test*` in filename | +| Node.js | `devDependencies` in package.json | +| Ruby | Groups `:development`, `:test` | +| Rust | `[dev-dependencies]`, `[build-dependencies]` | +| Go | N/A (go.mod doesn't distinguish) | + +## Confirmation Prompt + +When using `--execute` with `--all`, you'll be prompted for confirmation: + +``` +Install all 67 packages? [Y/n]: +``` + +This prevents accidental mass installations. Single file imports with `--execute` do not require confirmation. + +## Limitations + +- **No lock file support**: Uses main dependency files only (not `package-lock.json`, `Gemfile.lock`, etc.) +- **No version resolution**: Installs versions as specified in the files +- **No conflict detection**: Doesn't check for version conflicts between ecosystems +- **Network required**: Package installation requires network access + +## Related Commands + +- `cortex install ` - Install a single package with AI assistance +- `cortex stack ` - Install predefined package stacks +- `cortex history` - View installation history +- `cortex rollback ` - Undo an installation diff --git a/tests/test_dependency_importer.py b/tests/test_dependency_importer.py new file mode 100644 index 00000000..d9ff0eed --- /dev/null +++ b/tests/test_dependency_importer.py @@ -0,0 +1,949 @@ +#!/usr/bin/env python3 +""" +Unit tests for the dependency_importer module. + +Tests parsing of: +- requirements.txt (Python) +- package.json (Node) +- Gemfile (Ruby) +- Cargo.toml (Rust) +- go.mod (Go) +""" + +import json +import os +import sys +import tempfile +import unittest +from pathlib import Path + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from cortex.dependency_importer import ( + DEPENDENCY_FILES, + INSTALL_COMMANDS, + DependencyImporter, + Package, + PackageEcosystem, + ParseResult, + format_package_list, +) + + +class TestPackageEcosystem(unittest.TestCase): + """Tests for PackageEcosystem enum.""" + + def test_ecosystem_values(self): + self.assertEqual(PackageEcosystem.PYTHON.value, "python") + self.assertEqual(PackageEcosystem.NODE.value, "node") + self.assertEqual(PackageEcosystem.RUBY.value, "ruby") + self.assertEqual(PackageEcosystem.RUST.value, "rust") + self.assertEqual(PackageEcosystem.GO.value, "go") + self.assertEqual(PackageEcosystem.UNKNOWN.value, "unknown") + + +class TestPackage(unittest.TestCase): + """Tests for Package dataclass.""" + + def test_package_creation(self): + pkg = Package(name="requests", version="2.28.0", ecosystem=PackageEcosystem.PYTHON) + self.assertEqual(pkg.name, "requests") + self.assertEqual(pkg.version, "2.28.0") + self.assertEqual(pkg.ecosystem, PackageEcosystem.PYTHON) + self.assertFalse(pkg.is_dev) + + def test_package_str_with_version(self): + pkg = Package(name="requests", version="2.28.0") + self.assertEqual(str(pkg), "requests@2.28.0") + + def test_package_str_without_version(self): + pkg = Package(name="requests") + self.assertEqual(str(pkg), "requests") + + def test_package_defaults(self): + pkg = Package(name="test") + self.assertIsNone(pkg.version) + self.assertEqual(pkg.ecosystem, PackageEcosystem.UNKNOWN) + self.assertFalse(pkg.is_dev) + self.assertEqual(pkg.extras, []) + self.assertIsNone(pkg.source) + self.assertIsNone(pkg.group) + self.assertEqual(pkg.features, []) + self.assertFalse(pkg.is_indirect) + self.assertFalse(pkg.is_optional) + + +class TestParseResult(unittest.TestCase): + """Tests for ParseResult dataclass.""" + + def test_parse_result_counts(self): + packages = [Package(name="pkg1"), Package(name="pkg2")] + dev_packages = [Package(name="dev1")] + result = ParseResult( + file_path="/test/file", + ecosystem=PackageEcosystem.PYTHON, + packages=packages, + dev_packages=dev_packages, + ) + self.assertEqual(result.prod_count, 2) + self.assertEqual(result.dev_count, 1) + self.assertEqual(result.total_count, 3) + + def test_parse_result_empty(self): + result = ParseResult( + file_path="/test/file", + ecosystem=PackageEcosystem.PYTHON, + packages=[], + ) + self.assertEqual(result.prod_count, 0) + self.assertEqual(result.dev_count, 0) + self.assertEqual(result.total_count, 0) + + +class TestDependencyImporter(unittest.TestCase): + """Tests for DependencyImporter class.""" + + def setUp(self): + self.importer = DependencyImporter() + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + import shutil + + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_temp_file(self, filename: str, content: str) -> str: + """Helper to create a temporary file.""" + file_path = os.path.join(self.temp_dir, filename) + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + return file_path + + +class TestEcosystemDetection(TestDependencyImporter): + """Tests for ecosystem detection.""" + + def test_detect_requirements_txt(self): + self.assertEqual( + self.importer.detect_ecosystem("requirements.txt"), + PackageEcosystem.PYTHON, + ) + + def test_detect_requirements_dev_txt(self): + self.assertEqual( + self.importer.detect_ecosystem("requirements-dev.txt"), + PackageEcosystem.PYTHON, + ) + + def test_detect_requirements_pattern(self): + self.assertEqual( + self.importer.detect_ecosystem("requirements-prod.txt"), + PackageEcosystem.PYTHON, + ) + + def test_detect_package_json(self): + self.assertEqual( + self.importer.detect_ecosystem("package.json"), + PackageEcosystem.NODE, + ) + + def test_detect_gemfile(self): + self.assertEqual( + self.importer.detect_ecosystem("Gemfile"), + PackageEcosystem.RUBY, + ) + + def test_detect_cargo_toml(self): + self.assertEqual( + self.importer.detect_ecosystem("Cargo.toml"), + PackageEcosystem.RUST, + ) + + def test_detect_go_mod(self): + self.assertEqual( + self.importer.detect_ecosystem("go.mod"), + PackageEcosystem.GO, + ) + + def test_detect_unknown(self): + self.assertEqual( + self.importer.detect_ecosystem("unknown.file"), + PackageEcosystem.UNKNOWN, + ) + + def test_detect_with_path(self): + self.assertEqual( + self.importer.detect_ecosystem("/some/path/requirements.txt"), + PackageEcosystem.PYTHON, + ) + + +class TestRequirementsTxtParsing(TestDependencyImporter): + """Tests for requirements.txt parsing.""" + + def test_parse_simple_packages(self): + content = """requests +flask +django +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(result.ecosystem, PackageEcosystem.PYTHON) + self.assertEqual(len(result.packages), 3) + self.assertEqual(result.packages[0].name, "requests") + self.assertEqual(result.packages[1].name, "flask") + self.assertEqual(result.packages[2].name, "django") + + def test_parse_with_versions(self): + content = """requests==2.28.0 +flask>=2.0.0 +django~=4.0 +numpy!=1.0.0 +pandas<2.0,>=1.0 +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 5) + self.assertEqual(result.packages[0].name, "requests") + self.assertEqual(result.packages[0].version, "==2.28.0") + + def test_parse_with_comments(self): + content = """# This is a comment +requests # inline comment should be handled +# Another comment +flask +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + self.assertEqual(result.packages[0].name, "requests") + + def test_parse_with_extras(self): + content = """requests[security,socks]>=2.20.0 +celery[redis] +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + self.assertEqual(result.packages[0].name, "requests") + self.assertIn("security", result.packages[0].extras) + self.assertIn("socks", result.packages[0].extras) + + def test_parse_with_environment_markers(self): + content = """pywin32; sys_platform == 'win32' +requests>=2.20.0; python_version >= "3.6" +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + self.assertEqual(result.packages[0].name, "pywin32") + self.assertEqual(result.packages[1].name, "requests") + + def test_parse_empty_file(self): + content = "" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 0) + self.assertEqual(len(result.errors), 0) + + def test_parse_only_comments(self): + content = """# Just comments +# Nothing else +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 0) + + def test_parse_with_blank_lines(self): + content = """requests + +flask + +django +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 3) + + def test_parse_skips_options(self): + content = """--index-url https://pypi.org/simple +--extra-index-url https://custom.pypi.org +--trusted-host custom.pypi.org +requests +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(result.packages[0].name, "requests") + + def test_parse_recursive_includes(self): + # Create base requirements + base_content = """flask +django +""" + base_path = self._create_temp_file("base.txt", base_content) + + # Create main requirements with -r include + main_content = """-r base.txt +requests +""" + main_path = self._create_temp_file("requirements.txt", main_content) + + importer = DependencyImporter(base_path=self.temp_dir) + result = importer.parse(main_path) + + self.assertEqual(len(result.packages), 3) + names = [pkg.name for pkg in result.packages] + self.assertIn("flask", names) + self.assertIn("django", names) + self.assertIn("requests", names) + + def test_parse_missing_include_warning(self): + content = """-r nonexistent.txt +requests +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertTrue(len(result.warnings) > 0) + + def test_parse_dev_requirements_file(self): + content = """pytest +black +mypy +""" + file_path = self._create_temp_file("requirements-dev.txt", content) + result = self.importer.parse(file_path, include_dev=True) + + # Dev file packages should be in dev_packages + self.assertEqual(len(result.dev_packages), 3) + self.assertTrue(all(pkg.is_dev for pkg in result.dev_packages)) + + def test_parse_git_url(self): + content = """git+https://github.com/user/repo.git#egg=mypackage +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + # The #egg= fragment specifies the package name + self.assertEqual(result.packages[0].name, "mypackage") + + def test_parse_editable_install(self): + content = """-e git+https://github.com/user/myproject.git#egg=myproject +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIsNotNone(result.packages[0].source) + + def test_file_not_found(self): + result = self.importer.parse("/nonexistent/requirements.txt") + + self.assertEqual(len(result.packages), 0) + self.assertTrue(len(result.errors) > 0) + self.assertIn("not found", result.errors[0].lower()) + + +class TestPackageJsonParsing(TestDependencyImporter): + """Tests for package.json parsing.""" + + def test_parse_simple_dependencies(self): + content = json.dumps( + { + "name": "test-project", + "dependencies": { + "express": "^4.18.0", + "lodash": "~4.17.21", + }, + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertEqual(result.ecosystem, PackageEcosystem.NODE) + self.assertEqual(len(result.packages), 2) + names = [pkg.name for pkg in result.packages] + self.assertIn("express", names) + self.assertIn("lodash", names) + + def test_parse_dev_dependencies(self): + content = json.dumps( + { + "dependencies": {"express": "^4.18.0"}, + "devDependencies": { + "jest": "^29.0.0", + "typescript": "^5.0.0", + }, + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path, include_dev=True) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(len(result.dev_packages), 2) + self.assertTrue(all(pkg.is_dev for pkg in result.dev_packages)) + + def test_parse_scoped_packages(self): + content = json.dumps( + { + "dependencies": { + "@types/node": "^18.0.0", + "@babel/core": "^7.0.0", + } + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + names = [pkg.name for pkg in result.packages] + self.assertIn("@types/node", names) + self.assertIn("@babel/core", names) + + def test_parse_optional_dependencies(self): + content = json.dumps( + { + "dependencies": {"express": "^4.18.0"}, + "optionalDependencies": {"fsevents": "^2.3.0"}, + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + optional_pkgs = [pkg for pkg in result.packages if pkg.is_optional] + self.assertEqual(len(optional_pkgs), 1) + self.assertEqual(optional_pkgs[0].name, "fsevents") + + def test_parse_peer_dependencies_warning(self): + content = json.dumps( + { + "dependencies": {"express": "^4.18.0"}, + "peerDependencies": {"react": "^18.0.0"}, + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertTrue(len(result.warnings) > 0) + self.assertIn("peer", result.warnings[0].lower()) + + def test_parse_empty_dependencies(self): + content = json.dumps( + { + "name": "test-project", + "version": "1.0.0", + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 0) + self.assertEqual(len(result.errors), 0) + + def test_parse_invalid_json(self): + content = "{ invalid json }" + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 0) + self.assertTrue(len(result.errors) > 0) + self.assertIn("json", result.errors[0].lower()) + + def test_parse_git_url_version(self): + content = json.dumps( + { + "dependencies": { + "mypackage": "git+https://github.com/user/repo.git", + } + } + ) + file_path = self._create_temp_file("package.json", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIsNotNone(result.packages[0].source) + + +class TestGemfileParsing(TestDependencyImporter): + """Tests for Gemfile parsing.""" + + def test_parse_simple_gems(self): + content = """source 'https://rubygems.org' + +gem 'rails' +gem 'pg' +gem 'puma' +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path) + + self.assertEqual(result.ecosystem, PackageEcosystem.RUBY) + self.assertEqual(len(result.packages), 3) + names = [pkg.name for pkg in result.packages] + self.assertIn("rails", names) + self.assertIn("pg", names) + self.assertIn("puma", names) + + def test_parse_gems_with_versions(self): + content = """gem 'rails', '~> 7.0' +gem 'pg', '>= 1.0' +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + self.assertEqual(result.packages[0].version, "~> 7.0") + + def test_parse_group_block(self): + content = """gem 'rails' + +group :development, :test do + gem 'rspec' + gem 'factory_bot' +end +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path, include_dev=True) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(len(result.dev_packages), 2) + self.assertTrue(all(pkg.is_dev for pkg in result.dev_packages)) + + def test_parse_inline_group(self): + content = """gem 'rails' +gem 'rspec', group: :test +gem 'rubocop', group: [:development, :test] +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path, include_dev=True) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(len(result.dev_packages), 2) + + def test_parse_path_gem(self): + content = """gem 'my_local_gem', path: './gems/local' +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIn("path:", result.packages[0].source) + + def test_parse_git_gem(self): + content = """gem 'my_gem', git: 'https://github.com/user/repo.git' +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIn("git:", result.packages[0].source) + + def test_parse_ruby_version_ignored(self): + content = """ruby '3.2.0' + +gem 'rails' +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(result.packages[0].name, "rails") + + def test_parse_comments_ignored(self): + content = """# Production gems +gem 'rails' # Main framework +# gem 'old_gem' # Commented out +""" + file_path = self._create_temp_file("Gemfile", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + + +class TestCargoTomlParsing(TestDependencyImporter): + """Tests for Cargo.toml parsing.""" + + def test_parse_simple_dependencies(self): + content = """[package] +name = "my_project" +version = "0.1.0" + +[dependencies] +serde = "1.0" +tokio = "1.0" +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path) + + self.assertEqual(result.ecosystem, PackageEcosystem.RUST) + self.assertEqual(len(result.packages), 2) + names = [pkg.name for pkg in result.packages] + self.assertIn("serde", names) + self.assertIn("tokio", names) + + def test_parse_inline_table(self): + content = """[dependencies] +tokio = { version = "1.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + tokio = next(pkg for pkg in result.packages if pkg.name == "tokio") + self.assertEqual(tokio.version, "1.0") + self.assertIn("full", tokio.features) + + def test_parse_dev_dependencies(self): + content = """[dependencies] +serde = "1.0" + +[dev-dependencies] +criterion = "0.4" +proptest = "1.0" +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path, include_dev=True) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(len(result.dev_packages), 2) + + def test_parse_build_dependencies(self): + content = """[dependencies] +serde = "1.0" + +[build-dependencies] +cc = "1.0" +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path, include_dev=True) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(len(result.dev_packages), 1) + + def test_parse_path_dependency(self): + content = """[dependencies] +my_crate = { path = "../my_crate" } +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIn("path:", result.packages[0].source) + + def test_parse_git_dependency(self): + content = """[dependencies] +my_crate = { git = "https://github.com/user/repo.git" } +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIn("git:", result.packages[0].source) + + def test_parse_optional_dependency(self): + content = """[dependencies] +serde = { version = "1.0", optional = true } +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertTrue(result.packages[0].is_optional) + + def test_parse_ignores_other_sections(self): + content = """[package] +name = "test" +version = "0.1.0" + +[dependencies] +serde = "1.0" + +[profile.release] +opt-level = 3 +""" + file_path = self._create_temp_file("Cargo.toml", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + + +class TestGoModParsing(TestDependencyImporter): + """Tests for go.mod parsing.""" + + def test_parse_simple_requires(self): + content = """module example.com/mymodule + +go 1.21 + +require ( + github.com/gin-gonic/gin v1.9.0 + github.com/go-redis/redis/v8 v8.11.5 +) +""" + file_path = self._create_temp_file("go.mod", content) + result = self.importer.parse(file_path) + + self.assertEqual(result.ecosystem, PackageEcosystem.GO) + self.assertEqual(len(result.packages), 2) + names = [pkg.name for pkg in result.packages] + self.assertIn("github.com/gin-gonic/gin", names) + + def test_parse_single_require(self): + content = """module example.com/mymodule + +go 1.21 + +require github.com/gin-gonic/gin v1.9.0 +""" + file_path = self._create_temp_file("go.mod", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertEqual(result.packages[0].name, "github.com/gin-gonic/gin") + self.assertEqual(result.packages[0].version, "v1.9.0") + + def test_parse_indirect_dependencies(self): + content = """module example.com/mymodule + +go 1.21 + +require ( + github.com/gin-gonic/gin v1.9.0 + github.com/indirect/dep v1.0.0 // indirect +) +""" + file_path = self._create_temp_file("go.mod", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 2) + indirect_pkg = next(pkg for pkg in result.packages if "indirect" in pkg.name) + self.assertTrue(indirect_pkg.is_indirect) + + def test_parse_replace_directive_warning(self): + content = """module example.com/mymodule + +go 1.21 + +require github.com/gin-gonic/gin v1.9.0 + +replace github.com/old/pkg => github.com/new/pkg v1.0.0 +""" + file_path = self._create_temp_file("go.mod", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertTrue(len(result.warnings) > 0) + self.assertIn("replace", result.warnings[0].lower()) + + def test_parse_exclude_directive_warning(self): + content = """module example.com/mymodule + +go 1.21 + +require github.com/gin-gonic/gin v1.9.0 + +exclude github.com/bad/pkg v1.0.0 +""" + file_path = self._create_temp_file("go.mod", content) + result = self.importer.parse(file_path) + + self.assertTrue(len(result.warnings) > 0) + self.assertIn("exclude", result.warnings[0].lower()) + + def test_parse_pseudo_version(self): + content = """module example.com/mymodule + +go 1.21 + +require ( + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 +) +""" + file_path = self._create_temp_file("go.mod", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + self.assertIn("v0.0.0-", result.packages[0].version) + + +class TestDirectoryScan(TestDependencyImporter): + """Tests for directory scanning.""" + + def test_scan_empty_directory(self): + results = self.importer.scan_directory(self.temp_dir) + self.assertEqual(len(results), 0) + + def test_scan_single_file(self): + content = "requests\nflask" + self._create_temp_file("requirements.txt", content) + + importer = DependencyImporter(base_path=self.temp_dir) + results = importer.scan_directory() + + self.assertEqual(len(results), 1) + self.assertTrue(any("requirements.txt" in path for path in results.keys())) + + def test_scan_multiple_files(self): + self._create_temp_file("requirements.txt", "requests") + self._create_temp_file("package.json", json.dumps({"dependencies": {"express": "^4.0.0"}})) + + importer = DependencyImporter(base_path=self.temp_dir) + results = importer.scan_directory() + + self.assertEqual(len(results), 2) + + +class TestInstallCommands(TestDependencyImporter): + """Tests for install command generation.""" + + def test_get_python_install_command(self): + cmd = self.importer.get_install_command(PackageEcosystem.PYTHON, "requirements.txt") + self.assertEqual(cmd, "pip install -r requirements.txt") + + def test_get_node_install_command(self): + cmd = self.importer.get_install_command(PackageEcosystem.NODE) + self.assertEqual(cmd, "npm install") + + def test_get_ruby_install_command(self): + cmd = self.importer.get_install_command(PackageEcosystem.RUBY) + self.assertEqual(cmd, "bundle install") + + def test_get_rust_install_command(self): + cmd = self.importer.get_install_command(PackageEcosystem.RUST) + self.assertEqual(cmd, "cargo build") + + def test_get_go_install_command(self): + cmd = self.importer.get_install_command(PackageEcosystem.GO) + self.assertEqual(cmd, "go mod download") + + def test_get_unknown_install_command(self): + cmd = self.importer.get_install_command(PackageEcosystem.UNKNOWN) + self.assertIsNone(cmd) + + def test_get_install_commands_for_results(self): + self._create_temp_file("requirements.txt", "requests") + self._create_temp_file("package.json", json.dumps({"dependencies": {"express": "^4.0.0"}})) + + importer = DependencyImporter(base_path=self.temp_dir) + results = importer.scan_directory() + commands = importer.get_install_commands_for_results(results) + + self.assertEqual(len(commands), 2) + self.assertTrue(all("command" in cmd for cmd in commands)) + self.assertTrue(all("description" in cmd for cmd in commands)) + + +class TestFormatPackageList(unittest.TestCase): + """Tests for format_package_list helper.""" + + def test_format_empty_list(self): + result = format_package_list([]) + self.assertEqual(result, "(none)") + + def test_format_single_package(self): + packages = [Package(name="requests", version="2.28.0")] + result = format_package_list(packages) + self.assertEqual(result, "requests@2.28.0") + + def test_format_multiple_packages(self): + packages = [ + Package(name="requests"), + Package(name="flask"), + Package(name="django"), + ] + result = format_package_list(packages) + self.assertEqual(result, "requests, flask, django") + + def test_format_with_truncation(self): + packages = [Package(name=f"pkg{i}") for i in range(15)] + result = format_package_list(packages, max_display=10) + self.assertIn("(+5 more)", result) + + +class TestEdgeCases(TestDependencyImporter): + """Tests for edge cases and error handling.""" + + def test_parse_unknown_file_type(self): + content = "some content" + file_path = self._create_temp_file("unknown.xyz", content) + result = self.importer.parse(file_path) + + self.assertTrue(len(result.errors) > 0) + + def test_parse_with_unicode(self): + content = """# Comment with unicode: café ñ 日本語 +requests +""" + file_path = self._create_temp_file("requirements.txt", content) + result = self.importer.parse(file_path) + + self.assertEqual(len(result.packages), 1) + + def test_circular_include_prevention(self): + # Create files that include each other + content_a = """-r b.txt +requests +""" + content_b = """-r a.txt +flask +""" + self._create_temp_file("a.txt", content_a) + self._create_temp_file("b.txt", content_b) + + # Rename to requirements pattern + os.rename( + os.path.join(self.temp_dir, "a.txt"), + os.path.join(self.temp_dir, "requirements.txt"), + ) + os.rename( + os.path.join(self.temp_dir, "b.txt"), + os.path.join(self.temp_dir, "requirements-base.txt"), + ) + + importer = DependencyImporter(base_path=self.temp_dir) + # Should not infinite loop + result = importer.parse(os.path.join(self.temp_dir, "requirements.txt")) + + # Should have warnings about circular include + self.assertTrue(len(result.packages) >= 1) + + +class TestDependencyFilesMapping(unittest.TestCase): + """Tests for DEPENDENCY_FILES constant.""" + + def test_all_ecosystems_covered(self): + ecosystems_in_mapping = set(DEPENDENCY_FILES.values()) + # Should have Python, Node, Ruby, Rust, Go + self.assertIn(PackageEcosystem.PYTHON, ecosystems_in_mapping) + self.assertIn(PackageEcosystem.NODE, ecosystems_in_mapping) + self.assertIn(PackageEcosystem.RUBY, ecosystems_in_mapping) + self.assertIn(PackageEcosystem.RUST, ecosystems_in_mapping) + self.assertIn(PackageEcosystem.GO, ecosystems_in_mapping) + + def test_install_commands_for_all_ecosystems(self): + for ecosystem in [ + PackageEcosystem.PYTHON, + PackageEcosystem.NODE, + PackageEcosystem.RUBY, + PackageEcosystem.RUST, + PackageEcosystem.GO, + ]: + self.assertIn(ecosystem, INSTALL_COMMANDS) + + +if __name__ == "__main__": + unittest.main()