diff --git a/README.md b/README.md index 9a5d4ab..d6b5a2c 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,27 @@ # ShellMCP -**Expose Shell Commands as MCP tools** +**Expose Shell Commands as MCP Tools** -ShellMCP is a powerful tool that allows you to easily create Model Context Protocol (MCP) servers by exposing shell commands as structured tools. Instead of granting AI agents full shell access (which poses security risks), ShellMCP enables you to expose only the specific commands you trust, allowing agents to work autonomously with a predefined set of safe operations. Define your tools in YAML, and ShellMCP generates a complete FastMCP server for you. +ShellMCP is a powerful tool that allows you to easily create Model Context Protocol (MCP) servers by exposing shell commands as structured tools. Instead of granting AI agents full shell access (which poses security risks), ShellMCP enables you to expose only the specific commands you trust, allowing agents to work autonomously with a predefined set of safe operations. + +Define your tools in YAML, and ShellMCP generates a complete FastMCP server for you. ## Quick Start ```bash -# Install +# Install ShellMCP pip install shellmcp -# Create a new server +# Create a new server configuration shellmcp new --name "my-server" --desc "My custom MCP server" -# Add a tool +# Add a tool interactively shellmcp add-tool my-server.yml -# Generate the server +# Validate the configuration +shellmcp validate my-server.yml + +# Generate the FastMCP server shellmcp generate my-server.yml ``` @@ -28,6 +33,8 @@ shellmcp generate my-server.yml - āœ… **Validation**: Built-in configuration validation and error checking - šŸŽÆ **FastMCP Integration**: Generates production-ready FastMCP servers - šŸ“¦ **Complete Output**: Includes server code, requirements, and documentation +- šŸ”’ **Security-First**: Expose only trusted commands to AI agents +- šŸŽØ **Flexible**: Support for tools, resources, and prompts with reusable arguments ## Example @@ -92,10 +99,50 @@ prompts: type: string ``` -## Installation +## CLI Commands + +ShellMCP provides several commands to help you create and manage MCP servers: + +### `shellmcp new` +Create a new server configuration file. ```bash -pip install shellmcp +shellmcp new --name "my-server" --desc "My custom MCP server" --version "1.0.0" +``` + +### `shellmcp add-tool` +Add a new tool to an existing configuration. + +```bash +shellmcp add-tool my-server.yml --name "list-files" --cmd "ls -la {{path}}" --desc "List files in directory" +``` + +### `shellmcp add-resource` +Add a new resource to an existing configuration. + +```bash +shellmcp add-resource my-server.yml --name "system-info" --uri "file:///tmp/system-info.txt" --resource-name "System Information" +``` + +### `shellmcp add-prompt` +Add a new prompt to an existing configuration. + +```bash +shellmcp add-prompt my-server.yml --name "file-analysis" --prompt-name "File Analysis Assistant" +``` + +### `shellmcp validate` +Validate a YAML configuration file. + +```bash +shellmcp validate my-server.yml --verbose +``` + +### `shellmcp generate` +Generate a FastMCP server from YAML configuration. + +```bash +shellmcp generate my-server.yml --output-dir ./output --verbose ``` ## Documentation diff --git a/pyproject.toml b/pyproject.toml index 7f9ddb4..98e48da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "shellmcp" -version = "0.1.0" +version = "1.0.0" description = "Expose Shell Commands as MCP tools" readme = "README.md" requires-python = ">=3.8" diff --git a/shellmcp/__init__.py b/shellmcp/__init__.py index 75aaddb..f729aa2 100644 --- a/shellmcp/__init__.py +++ b/shellmcp/__init__.py @@ -1,3 +1,3 @@ """ShellMCP - Expose Shell Commands as MCP tools.""" -__version__ = "0.1.0" \ No newline at end of file +__version__ = "1.0.0" \ No newline at end of file diff --git a/shellmcp/cli.py b/shellmcp/cli.py index e8a1c41..96249c5 100644 --- a/shellmcp/cli.py +++ b/shellmcp/cli.py @@ -1,13 +1,37 @@ -"""Command-line interface for YML configuration validation using Google Fire.""" +"""Command-line interface for ShellMCP.""" -import fire import sys from pathlib import Path from typing import Dict, Any, Optional, List -from .parser import YMLParser + +import fire + from .generator import FastMCPGenerator -from .models import YMLConfig, ServerConfig, ToolConfig, ResourceConfig, PromptConfig, ToolArgument, ArgumentDefinition -from .utils import get_input, get_choice, get_yes_no, save_config, load_or_create_config +from .models import ( + ArgumentDefinition, + PromptConfig, + ResourceConfig, + ServerConfig, + ToolArgument, + ToolConfig, + YMLConfig, +) +from .parser import YMLParser +from .utils import get_choice, get_input, get_yes_no, load_or_create_config, save_config + + +def _handle_error(error_msg: str, verbose: bool = False, exception: Exception = None) -> int: + """Common error handling for CLI functions.""" + print(f"āŒ {error_msg}", file=sys.stderr) + if verbose and exception: + import traceback + traceback.print_exc() + return 1 + + +def _check_file_exists(file_path: str) -> bool: + """Check if file exists and return True/False.""" + return Path(file_path).exists() def validate(config_file: str, verbose: bool = False) -> int: @@ -21,27 +45,17 @@ def validate(config_file: str, verbose: bool = False) -> int: Returns: Exit code (0 for success, 1 for failure) """ - parser = YMLParser() - try: - # Check if file exists - if not Path(config_file).exists(): - print(f"āŒ Error: File '{config_file}' not found", file=sys.stderr) - return 1 + if not _check_file_exists(config_file): + return _handle_error(f"File '{config_file}' not found") - # Load and validate configuration + parser = YMLParser() config = parser.load_from_file(config_file) - _output_validation(config_file, config, parser, verbose) - return 0 except Exception as e: - print(f"āŒ Error validating configuration: {e}", file=sys.stderr) - if verbose: - import traceback - traceback.print_exc() - return 1 + return _handle_error(f"Error validating configuration: {e}", verbose, e) def _output_validation(config_file: str, config, parser, verbose: bool): @@ -85,15 +99,11 @@ def generate(config_file: str, output_dir: str = None, verbose: bool = False) -> Returns: Exit code (0 for success, 1 for failure) """ - generator = FastMCPGenerator() - try: - # Check if file exists - if not Path(config_file).exists(): - print(f"āŒ Error: File '{config_file}' not found", file=sys.stderr) - return 1 + if not _check_file_exists(config_file): + return _handle_error(f"File '{config_file}' not found") - # Load and validate configuration first + # Load and validate configuration parser = YMLParser() config = parser.load_from_file(config_file) @@ -106,26 +116,20 @@ def generate(config_file: str, output_dir: str = None, verbose: bool = False) -> # Determine output directory if output_dir is None: - # Create a subdirectory matching the server name by default config_dir = Path(config_file).parent server_name = config.server.name.replace('-', '_').replace(' ', '_').lower() output_dir = config_dir / server_name else: output_dir = Path(output_dir) - # Create the output directory output_dir.mkdir(parents=True, exist_ok=True) - # Generate server file + # Generate files + generator = FastMCPGenerator() server_file = generator.generate_server(config_file, str(output_dir / f"{config.server.name.replace('-', '_')}_server.py")) - - # Generate requirements.txt requirements_file = generator.generate_requirements(str(output_dir / "requirements.txt")) - - # Generate README.md readme_file = generator.generate_readme(config, str(output_dir / "README.md")) - print(f"āœ… FastMCP server generated successfully!") print(f"šŸ“ Output directory: {output_dir}") print(f"šŸ Server file: {server_file}") @@ -135,21 +139,14 @@ def generate(config_file: str, output_dir: str = None, verbose: bool = False) -> if verbose: print(f"\nšŸš€ To run the server:") print(f" cd {output_dir}") - print(f" # Create and activate virtual environment (recommended):") - print(f" python3 -m venv venv") - print(f" source venv/bin/activate") + print(f" python3 -m venv venv && source venv/bin/activate") print(f" pip install -r requirements.txt") print(f" python {Path(server_file).name}") - return 0 except Exception as e: - print(f"āŒ Error generating server: {e}", file=sys.stderr) - if verbose: - import traceback - traceback.print_exc() - return 1 + return _handle_error(f"Error generating server: {e}", verbose, e) def new(name: str = None, desc: str = None, version: str = None, output_file: str = None) -> int: @@ -166,39 +163,22 @@ def new(name: str = None, desc: str = None, version: str = None, output_file: st Exit code (0 for success, 1 for failure) """ try: - # Get server name - if not name: - name = get_input("Server name", required=True) - - # Get server description - if not desc: - desc = get_input("Server description", required=True) - - # Get version - if not version: - version = get_input("Server version", default="1.0.0") + # Get required inputs + name = name or get_input("Server name", required=True) + desc = desc or get_input("Server description", required=True) + version = version or get_input("Server version", default="1.0.0") # Determine output file - if not output_file: - output_file = f"{name.replace(' ', '_').lower()}.yml" + output_file = output_file or f"{name.replace(' ', '_').lower()}.yml" # Check if file already exists if Path(output_file).exists(): - overwrite = get_yes_no(f"File '{output_file}' already exists. Overwrite?", default=False) - if not overwrite: + if not get_yes_no(f"File '{output_file}' already exists. Overwrite?", default=False): print("āŒ Operation cancelled") return 1 - # Create server configuration - server_config = ServerConfig( - name=name, - desc=desc, - version=version - ) - - config = YMLConfig(server=server_config) - - # Save configuration + # Create and save configuration + config = YMLConfig(server=ServerConfig(name=name, desc=desc, version=version)) save_config(config, output_file) print(f"āœ… Created new server configuration: {output_file}") @@ -212,8 +192,7 @@ def new(name: str = None, desc: str = None, version: str = None, output_file: st return 0 except Exception as e: - print(f"āŒ Error creating server: {e}", file=sys.stderr) - return 1 + return _handle_error(f"Error creating server: {e}", exception=e) def _collect_tool_argument(config: YMLConfig, existing_args: List[str] = None) -> Optional[ToolArgument]: @@ -365,36 +344,24 @@ def add_tool(config_file: str, name: str = None, cmd: str = None, desc: str = No Exit code (0 for success, 1 for failure) """ try: - # Load existing configuration config = load_or_create_config(config_file) - # Get tool name - if not name: - name = get_input("Tool name", required=True) + # Get tool details + name = name or get_input("Tool name", required=True) + cmd = cmd or get_input("Shell command (supports Jinja2 templates like {{arg_name}})", required=True) + desc = desc or get_input("Tool description", required=True) + help_cmd = help_cmd or get_input("Help command (optional, press Enter to skip)", required=False) # Check if tool already exists if config.tools and name in config.tools: - overwrite = get_yes_no(f"Tool '{name}' already exists. Overwrite?", default=False) - if not overwrite: + if not get_yes_no(f"Tool '{name}' already exists. Overwrite?", default=False): print("āŒ Operation cancelled") return 1 - # Get tool command - if not cmd: - cmd = get_input("Shell command (supports Jinja2 templates like {{arg_name}})", required=True) - - # Get tool description - if not desc: - desc = get_input("Tool description", required=True) - - # Get help command (optional) - if not help_cmd: - help_cmd = get_input("Help command (optional, press Enter to skip)", required=False) - # Collect tool arguments arguments = _collect_tool_arguments(config) - # Create tool configuration + # Create and add tool configuration tool_config = ToolConfig( cmd=cmd, desc=desc, @@ -402,12 +369,9 @@ def add_tool(config_file: str, name: str = None, cmd: str = None, desc: str = No args=arguments if arguments else None ) - # Add tool to configuration if not config.tools: config.tools = {} config.tools[name] = tool_config - - # Save configuration save_config(config, config_file) print(f"āœ… Added tool '{name}' to {config_file}") @@ -432,8 +396,7 @@ def add_tool(config_file: str, name: str = None, cmd: str = None, desc: str = No return 0 except Exception as e: - print(f"āŒ Error adding tool: {e}", file=sys.stderr) - return 1 + return _handle_error(f"Error adding tool: {e}", exception=e) def add_resource(config_file: str, name: str = None, uri: str = None, resource_name: str = None, @@ -454,79 +417,43 @@ def add_resource(config_file: str, name: str = None, uri: str = None, resource_n Exit code (0 for success, 1 for failure) """ try: - # Load existing configuration config = load_or_create_config(config_file) - # Get resource name - if not name: - name = get_input("Resource name (key)", required=True) + # Get resource details + name = name or get_input("Resource name (key)", required=True) + uri = uri or get_input("Resource URI", required=True) + resource_name = resource_name or get_input("Resource display name", default=name) + description = description or get_input("Resource description", required=False) + content_type = content_type or get_input("MIME type (optional, e.g., text/plain, application/json)", required=False) + content_source = content_source or get_choice("How will the resource content be provided?", ["cmd", "file", "text"], default="cmd") # Check if resource already exists if config.resources and name in config.resources: - overwrite = get_yes_no(f"Resource '{name}' already exists. Overwrite?", default=False) - if not overwrite: + if not get_yes_no(f"Resource '{name}' already exists. Overwrite?", default=False): print("āŒ Operation cancelled") return 1 - # Get URI - if not uri: - uri = get_input("Resource URI", required=True) - - # Get resource display name - if not resource_name: - resource_name = get_input("Resource display name", default=name) - - # Get description - if not description: - description = get_input("Resource description", required=False) - - # Get MIME type - if not content_type: - content_type = get_input("MIME type (optional, e.g., text/plain, application/json)", required=False) - - # Get content source type - if not content_source: - content_source = get_choice( - "How will the resource content be provided?", - ["cmd", "file", "text"], - default="cmd" - ) - # Get content based on source type - if content_source == "cmd": - content = get_input("Shell command to generate content (supports Jinja2 templates)", required=True) - resource_config = ResourceConfig( - uri=uri, - name=resource_name, - description=description, - mime_type=content_type, - cmd=content - ) - elif content_source == "file": - content = get_input("File path to read content from", required=True) - resource_config = ResourceConfig( - uri=uri, - name=resource_name, - description=description, - mime_type=content_type, - file=content - ) - else: # text - content = get_input("Direct text content", required=True) - resource_config = ResourceConfig( - uri=uri, - name=resource_name, - description=description, - mime_type=content_type, - text=content - ) + content = get_input( + "Shell command to generate content (supports Jinja2 templates)" if content_source == "cmd" + else "File path to read content from" if content_source == "file" + else "Direct text content", + required=True + ) + + # Create resource configuration + resource_config = ResourceConfig( + uri=uri, + name=resource_name, + description=description, + mime_type=content_type, + **{content_source: content} + ) # Add resource to configuration if not config.resources: config.resources = {} config.resources[name] = resource_config - - # Save configuration save_config(config, config_file) print(f"āœ… Added resource '{name}' to {config_file}") @@ -542,8 +469,7 @@ def add_resource(config_file: str, name: str = None, uri: str = None, resource_n return 0 except Exception as e: - print(f"āŒ Error adding resource: {e}", file=sys.stderr) - return 1 + return _handle_error(f"Error adding resource: {e}", exception=e) def add_prompt(config_file: str, name: str = None, prompt_name: str = None, description: str = None, @@ -562,65 +488,39 @@ def add_prompt(config_file: str, name: str = None, prompt_name: str = None, desc Exit code (0 for success, 1 for failure) """ try: - # Load existing configuration config = load_or_create_config(config_file) - # Get prompt name - if not name: - name = get_input("Prompt name (key)", required=True) + # Get prompt details + name = name or get_input("Prompt name (key)", required=True) + prompt_name = prompt_name or get_input("Prompt display name", default=name) + description = description or get_input("Prompt description", required=False) + content_source = content_source or get_choice("How will the prompt content be provided?", ["cmd", "file", "template"], default="template") # Check if prompt already exists if config.prompts and name in config.prompts: - overwrite = get_yes_no(f"Prompt '{name}' already exists. Overwrite?", default=False) - if not overwrite: + if not get_yes_no(f"Prompt '{name}' already exists. Overwrite?", default=False): print("āŒ Operation cancelled") return 1 - # Get prompt display name - if not prompt_name: - prompt_name = get_input("Prompt display name", default=name) - - # Get description - if not description: - description = get_input("Prompt description", required=False) - - # Get content source type - if not content_source: - content_source = get_choice( - "How will the prompt content be provided?", - ["cmd", "file", "template"], - default="template" - ) - # Get content based on source type - if content_source == "cmd": - content = get_input("Shell command to generate prompt content (supports Jinja2 templates)", required=True) - prompt_config = PromptConfig( - name=prompt_name, - description=description, - cmd=content - ) - elif content_source == "file": - content = get_input("File path to read prompt content from", required=True) - prompt_config = PromptConfig( - name=prompt_name, - description=description, - file=content - ) - else: # template - content = get_input("Jinja2 template content for the prompt", required=True) - prompt_config = PromptConfig( - name=prompt_name, - description=description, - template=content - ) + content = get_input( + "Shell command to generate prompt content (supports Jinja2 templates)" if content_source == "cmd" + else "File path to read prompt content from" if content_source == "file" + else "Jinja2 template content for the prompt", + required=True + ) + + # Create prompt configuration + prompt_config = PromptConfig( + name=prompt_name, + description=description, + **{content_source: content} + ) # Add prompt to configuration if not config.prompts: config.prompts = {} config.prompts[name] = prompt_config - - # Save configuration save_config(config, config_file) print(f"āœ… Added prompt '{name}' to {config_file}") @@ -633,8 +533,7 @@ def add_prompt(config_file: str, name: str = None, prompt_name: str = None, desc return 0 except Exception as e: - print(f"āŒ Error adding prompt: {e}", file=sys.stderr) - return 1 + return _handle_error(f"Error adding prompt: {e}", exception=e) def main(): diff --git a/shellmcp/generator.py b/shellmcp/generator.py index b71a19b..eb47a56 100644 --- a/shellmcp/generator.py +++ b/shellmcp/generator.py @@ -4,10 +4,12 @@ import subprocess import tempfile from pathlib import Path -from typing import Dict, Any, Optional, List -from jinja2 import Template, Environment, FileSystemLoader -from .parser import YMLParser +from typing import Any, Dict, List, Optional + +from jinja2 import Environment, FileSystemLoader, Template + from .models import YMLConfig +from .parser import YMLParser from .template_utils import get_jinja_filters @@ -37,26 +39,29 @@ def generate_server(self, config_file: str, output_file: Optional[str] = None) - Returns: Path to the generated server file """ - # Load and validate configuration - config = self.parser.load_from_file(config_file) - - # Generate server code - server_code = self._generate_server_code(config) - - # Determine output path - if output_file is None: - config_path = Path(config_file) - output_file = config_path.parent / f"{config.server.name.replace('-', '_')}_server.py" - - # Ensure the output directory exists - output_path = Path(output_file) - output_path.parent.mkdir(parents=True, exist_ok=True) - - # Write server file - with open(output_file, 'w', encoding='utf-8') as f: - f.write(server_code) - - return str(output_file) + try: + # Load and validate configuration + config = self.parser.load_from_file(config_file) + + # Generate server code + server_code = self._generate_server_code(config) + + # Determine output path + if output_file is None: + config_path = Path(config_file) + output_file = config_path.parent / f"{config.server.name.replace('-', '_')}_server.py" + + # Ensure the output directory exists + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Write server file + with open(output_file, 'w', encoding='utf-8') as f: + f.write(server_code) + + return str(output_file) + except Exception as e: + raise RuntimeError(f"Failed to generate server: {e}") def _generate_server_code(self, config: YMLConfig) -> str: """Generate FastMCP server code from configuration using Jinja2 templates.""" diff --git a/shellmcp/models.py b/shellmcp/models.py index 6c51b59..d706cf5 100644 --- a/shellmcp/models.py +++ b/shellmcp/models.py @@ -1,8 +1,9 @@ """Pydantic models for YAML configuration parsing.""" +import re from typing import Any, Dict, List, Literal, Optional, Union + from pydantic import BaseModel, Field, field_validator, model_validator -import re class ArgumentDefinition(BaseModel): @@ -230,20 +231,20 @@ def get_resolved_arguments(self, tool_name: str) -> List[ToolArgument]: """Get fully resolved arguments for a tool, expanding references.""" if not self.tools or tool_name not in self.tools: return [] - - tool = self.tools[tool_name] - if not tool.args: + return self._resolve_arguments(self.tools[tool_name].args) + + def _resolve_arguments(self, args: Optional[List[ToolArgument]]) -> List[ToolArgument]: + """Helper method to resolve argument references.""" + if not args: return [] resolved_args = [] - for arg in tool.args: + for arg in args: if arg.ref: - # Resolve reference if not self.args or arg.ref not in self.args: raise ValueError(f"Reference '{arg.ref}' not found in args section") ref_arg = self.args[arg.ref] - # Create a new argument with the reference properties resolved_arg = ToolArgument( name=arg.name, help=ref_arg.help, @@ -262,79 +263,32 @@ def get_resolved_resource_arguments(self, resource_name: str) -> List[ToolArgume """Get fully resolved arguments for a resource, expanding references.""" if not self.resources or resource_name not in self.resources: return [] - - resource = self.resources[resource_name] - if not resource.args: - return [] - - resolved_args = [] - for arg in resource.args: - if arg.ref: - # Resolve reference - if not self.args or arg.ref not in self.args: - raise ValueError(f"Reference '{arg.ref}' not found in args section") - - ref_arg = self.args[arg.ref] - # Create a new argument with the reference properties - resolved_arg = ToolArgument( - name=arg.name, - help=ref_arg.help, - type=ref_arg.type, - default=ref_arg.default, - choices=ref_arg.choices, - pattern=ref_arg.pattern - ) - resolved_args.append(resolved_arg) - else: - resolved_args.append(arg) - - return resolved_args + return self._resolve_arguments(self.resources[resource_name].args) def get_resolved_prompt_arguments(self, prompt_name: str) -> List[ToolArgument]: """Get fully resolved arguments for a prompt, expanding references.""" if not self.prompts or prompt_name not in self.prompts: return [] - - prompt = self.prompts[prompt_name] - if not prompt.args: - return [] - - resolved_args = [] - for arg in prompt.args: - if arg.ref: - # Resolve reference - if not self.args or arg.ref not in self.args: - raise ValueError(f"Reference '{arg.ref}' not found in args section") - - ref_arg = self.args[arg.ref] - # Create a new argument with the reference properties - resolved_arg = ToolArgument( - name=arg.name, - help=ref_arg.help, - type=ref_arg.type, - default=ref_arg.default, - choices=ref_arg.choices, - pattern=ref_arg.pattern - ) - resolved_args.append(resolved_arg) - else: - resolved_args.append(arg) - - return resolved_args + return self._resolve_arguments(self.prompts[prompt_name].args) - def validate_jinja2_template(self, tool_name: str) -> bool: - """Validate that the tool's command contains valid Jinja2 template syntax.""" - if not self.tools or tool_name not in self.tools: - return False + def _validate_template(self, template_str: str) -> bool: + """Helper method to validate Jinja2 template syntax.""" + if not template_str: + return True # No template to validate try: from jinja2 import Template - template_str = self.tools[tool_name].cmd Template(template_str) return True except Exception: return False + def validate_jinja2_template(self, tool_name: str) -> bool: + """Validate that the tool's command contains valid Jinja2 template syntax.""" + if not self.tools or tool_name not in self.tools: + return False + return self._validate_template(self.tools[tool_name].cmd) + def validate_resource_jinja2_template(self, resource_name: str) -> bool: """Validate that the resource's content contains valid Jinja2 template syntax.""" if not self.resources or resource_name not in self.resources: @@ -342,15 +296,7 @@ def validate_resource_jinja2_template(self, resource_name: str) -> bool: resource = self.resources[resource_name] template_str = resource.cmd or resource.file or resource.text - if not template_str: - return True # No template to validate - - try: - from jinja2 import Template - Template(template_str) - return True - except Exception: - return False + return self._validate_template(template_str) def validate_prompt_jinja2_template(self, prompt_name: str) -> bool: """Validate that the prompt's content contains valid Jinja2 template syntax.""" @@ -359,34 +305,28 @@ def validate_prompt_jinja2_template(self, prompt_name: str) -> bool: prompt = self.prompts[prompt_name] template_str = prompt.cmd or prompt.file or prompt.template - if not template_str: - return True # No template to validate - - try: - from jinja2 import Template - Template(template_str) - return True - except Exception: - return False + return self._validate_template(template_str) - def get_template_variables(self, tool_name: str) -> List[str]: - """Extract template variables from a tool's command.""" - if not self.tools or tool_name not in self.tools: + def _extract_template_variables(self, template_str: str) -> List[str]: + """Helper method to extract template variables from a Jinja2 template.""" + if not template_str: return [] try: from jinja2 import Template, meta - template_str = self.tools[tool_name].cmd template = Template(template_str) - - # Get all variables used in the template ast = template.environment.parse(template_str) variables = meta.find_undeclared_variables(ast) - return list(variables) except Exception: return [] + def get_template_variables(self, tool_name: str) -> List[str]: + """Extract template variables from a tool's command.""" + if not self.tools or tool_name not in self.tools: + return [] + return self._extract_template_variables(self.tools[tool_name].cmd) + def get_resource_template_variables(self, resource_name: str) -> List[str]: """Extract template variables from a resource's command, file, or text.""" if not self.resources or resource_name not in self.resources: @@ -394,20 +334,7 @@ def get_resource_template_variables(self, resource_name: str) -> List[str]: resource = self.resources[resource_name] template_str = resource.cmd or resource.file or resource.text - if not template_str: - return [] - - try: - from jinja2 import Template, meta - template = Template(template_str) - - # Get all variables used in the template - ast = template.environment.parse(template_str) - variables = meta.find_undeclared_variables(ast) - - return list(variables) - except Exception: - return [] + return self._extract_template_variables(template_str) def get_prompt_template_variables(self, prompt_name: str) -> List[str]: """Extract template variables from a prompt's command, file, or template.""" @@ -416,19 +343,6 @@ def get_prompt_template_variables(self, prompt_name: str) -> List[str]: prompt = self.prompts[prompt_name] template_str = prompt.cmd or prompt.file or prompt.template - if not template_str: - return [] - - try: - from jinja2 import Template, meta - template = Template(template_str) - - # Get all variables used in the template - ast = template.environment.parse(template_str) - variables = meta.find_undeclared_variables(ast) - - return list(variables) - except Exception: - return [] + return self._extract_template_variables(template_str) model_config = {"extra": "forbid"} # Prevent additional fields not defined in the model \ No newline at end of file diff --git a/shellmcp/parser.py b/shellmcp/parser.py index be9bb96..ba325d5 100644 --- a/shellmcp/parser.py +++ b/shellmcp/parser.py @@ -1,8 +1,10 @@ """YAML configuration parser and validator.""" -import yaml -from typing import Dict, Any, Optional from pathlib import Path +from typing import Any, Dict, Optional + +import yaml + from .models import YMLConfig diff --git a/shellmcp/utils.py b/shellmcp/utils.py index c860196..882f499 100644 --- a/shellmcp/utils.py +++ b/shellmcp/utils.py @@ -1,12 +1,14 @@ """Utility functions for user input and file operations.""" -import yaml import sys from pathlib import Path from typing import List, Optional + import questionary +import yaml + +from .models import ServerConfig, YMLConfig from .parser import YMLParser -from .models import YMLConfig, ServerConfig def get_input(prompt: str, default: str = None, required: bool = True) -> str: @@ -25,13 +27,16 @@ def get_input(prompt: str, default: str = None, required: bool = True) -> str: ).ask() if result is None: - print("Operation cancelled by user.") + print("āŒ Operation cancelled by user.") sys.exit(0) return result.strip() if result else (default or "") except KeyboardInterrupt: - print("\nOperation cancelled by user.") + print("\nāŒ Operation cancelled by user.") sys.exit(0) + except Exception as e: + print(f"āŒ Error getting input: {e}") + sys.exit(1) def get_choice(prompt: str, choices: List[str], default: str = None) -> str: @@ -44,13 +49,16 @@ def get_choice(prompt: str, choices: List[str], default: str = None) -> str: ).ask() if result is None: - print("Operation cancelled by user.") + print("āŒ Operation cancelled by user.") sys.exit(0) return result except KeyboardInterrupt: - print("\nOperation cancelled by user.") + print("\nāŒ Operation cancelled by user.") sys.exit(0) + except Exception as e: + print(f"āŒ Error getting choice: {e}") + sys.exit(1) def get_yes_no(prompt: str, default: bool = None) -> bool: @@ -62,28 +70,37 @@ def get_yes_no(prompt: str, default: bool = None) -> bool: ).ask() if result is None: - print("Operation cancelled by user.") + print("āŒ Operation cancelled by user.") sys.exit(0) return result except KeyboardInterrupt: - print("\nOperation cancelled by user.") + print("\nāŒ Operation cancelled by user.") sys.exit(0) + except Exception as e: + print(f"āŒ Error getting yes/no input: {e}") + sys.exit(1) def save_config(config: YMLConfig, file_path: str) -> None: """Save configuration to YAML file.""" - config_dict = config.model_dump(exclude_none=True) - - with open(file_path, 'w', encoding='utf-8') as f: - yaml.dump(config_dict, f, default_flow_style=False, sort_keys=False, indent=2) + try: + config_dict = config.model_dump(exclude_none=True) + + with open(file_path, 'w', encoding='utf-8') as f: + yaml.dump(config_dict, f, default_flow_style=False, sort_keys=False, indent=2) + except Exception as e: + raise IOError(f"Failed to save configuration to {file_path}: {e}") def load_or_create_config(config_file: str) -> YMLConfig: """Load existing config or create new one.""" - if Path(config_file).exists(): - parser = YMLParser() - return parser.load_from_file(config_file) - else: - # Create minimal config - return YMLConfig(server=ServerConfig(name="", desc="")) \ No newline at end of file + try: + if Path(config_file).exists(): + parser = YMLParser() + return parser.load_from_file(config_file) + else: + # Create minimal config + return YMLConfig(server=ServerConfig(name="", desc="")) + except Exception as e: + raise ValueError(f"Failed to load configuration from {config_file}: {e}") \ No newline at end of file diff --git a/tests/test_models.py b/tests/test_models.py index 804f1df..3f898ae 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -2,8 +2,13 @@ import pytest from pydantic import ValidationError + from shellmcp.models import ( - ArgumentDefinition, ToolArgument, ServerConfig, ToolConfig, YMLConfig + ArgumentDefinition, + ServerConfig, + ToolArgument, + ToolConfig, + YMLConfig, ) diff --git a/tests/test_parser.py b/tests/test_parser.py index 2289dca..98e1e16 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -1,8 +1,10 @@ """Tests for YML parser.""" -import pytest import tempfile from pathlib import Path + +import pytest + from shellmcp.parser import YMLParser diff --git a/update_help_cmd.py b/update_help_cmd.py deleted file mode 100644 index b3bc35c..0000000 --- a/update_help_cmd.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python3 - -import subprocess -import tempfile -import os - -# Read the current template -with open('/home/blake/p/shellmcp/shellmcp/templates/server.py.j2', 'r') as f: - content = f.read() - -# Replace the simple help message with actual help command execution -old_help_section = '''{% if tool.help_cmd %} - - Help: Run `{{ tool.help_cmd }}` for more information. -{% endif %}''' - -new_help_section = '''{% if tool.help_cmd %} - - Help: -{% raw %} - {% set help_result = execute_command(tool.help_cmd) %} - {% if help_result.success %} - {{ help_result.stdout|indent(4, first=True) }} - {% else %} - Failed to get help: {{ help_result.stderr }} - {% endif %} -{% endraw %} -{% endif %}''' - -# Replace the content -updated_content = content.replace(old_help_section, new_help_section) - -# Write the updated template -with open('/home/blake/p/shellmcp/shellmcp/templates/server.py.j2', 'w') as f: - f.write(updated_content) - -print("Help command execution added to template!")