diff --git a/concore.py b/concore.py index 92f8ce8..a5b4973 100644 --- a/concore.py +++ b/concore.py @@ -7,6 +7,8 @@ import re import zmq import numpy as np +import signal + logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', @@ -81,6 +83,7 @@ def recv_json_with_retry(self): # Global ZeroMQ ports registry zmq_ports = {} +_cleanup_in_progress = False def init_zmq_port(port_name, port_type, address, socket_type_str): """ @@ -107,14 +110,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str): logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") def terminate_zmq(): - for port in zmq_ports.values(): + """Clean up all ZMQ sockets and contexts before exit.""" + global _cleanup_in_progress + + if _cleanup_in_progress: + return # Already cleaning up, prevent reentrant calls + + if not zmq_ports: + return # No ports to clean up + + _cleanup_in_progress = True + print("\nCleaning up ZMQ resources...") + for port_name, port in zmq_ports.items(): try: port.socket.close() port.context.term() + print(f"Closed ZMQ port: {port_name}") except Exception as e: logging.error(f"Error while terminating ZMQ port {port.address}: {e}") + zmq_ports.clear() + _cleanup_in_progress = False +def signal_handler(sig, frame): + """Handle interrupt signals gracefully.""" + print(f"\nReceived signal {sig}, shutting down gracefully...") + # Prevent terminate_zmq from being called twice: once here and once via atexit + try: + atexit.unregister(terminate_zmq) + except Exception: + # If unregister fails for any reason, proceed with explicit cleanup anyway + pass + terminate_zmq() + sys.exit(0) + +# Register cleanup handlers atexit.register(terminate_zmq) +signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C +if not hasattr(sys, 'getwindowsversion'): + signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only) + # --- ZeroMQ Integration End --- diff --git a/concore_cli/README.md b/concore_cli/README.md index e29c765..e3b55b6 100644 --- a/concore_cli/README.md +++ b/concore_cli/README.md @@ -72,16 +72,23 @@ concore run workflow.graphml --source ./src --output ./build --auto-build Validates a GraphML workflow file before running. +**Options:** +- `-s, --source ` - Source directory to verify file references exist + Checks: - Valid XML structure - GraphML format compliance - Node and edge definitions - File references and naming conventions -- ZMQ vs file-based communication +- Source file existence (when --source provided) +- ZMQ port conflicts and reserved ports +- Circular dependencies (warns for control loops) +- Edge connectivity **Example:** ```bash concore validate workflow.graphml +concore validate workflow.graphml --source ./src ``` ### `concore status` diff --git a/concore_cli/cli.py b/concore_cli/cli.py index f714453..f9e35f0 100644 --- a/concore_cli/cli.py +++ b/concore_cli/cli.py @@ -47,10 +47,16 @@ def run(workflow_file, source, output, type, auto_build): @cli.command() @click.argument('workflow_file', type=click.Path(exists=True)) -def validate(workflow_file): +@click.option( + '--source', + '-s', + type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), + help='Source directory to check file references', +) +def validate(workflow_file, source): """Validate a workflow file""" try: - validate_workflow(workflow_file, console) + validate_workflow(workflow_file, console, source) except Exception as e: console.print(f"[red]Error:[/red] {str(e)}") sys.exit(1) diff --git a/concore_cli/commands/validate.py b/concore_cli/commands/validate.py index fa1ea18..1845632 100644 --- a/concore_cli/commands/validate.py +++ b/concore_cli/commands/validate.py @@ -5,7 +5,7 @@ import re import xml.etree.ElementTree as ET -def validate_workflow(workflow_file, console): +def validate_workflow(workflow_file, console, source_dir=None): workflow_path = Path(workflow_file) console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}") @@ -138,6 +138,12 @@ def validate_workflow(workflow_file, console): if file_edges > 0: info.append(f"File-based edges: {file_edges}") + if source_dir: + _check_source_files(soup, Path(source_dir), errors, warnings) + + _check_cycles(soup, errors, warnings) + _check_zmq_ports(soup, errors, warnings) + show_results(console, errors, warnings, info) except FileNotFoundError: @@ -145,6 +151,104 @@ def validate_workflow(workflow_file, console): except Exception as e: console.print(f"[red]Validation failed:[/red] {str(e)}") +def _check_source_files(soup, source_path, errors, warnings): + nodes = soup.find_all('node') + + for node in nodes: + label_tag = node.find('y:NodeLabel') or node.find('NodeLabel') + if not label_tag or not label_tag.text: + continue + + label = label_tag.text.strip() + if ':' not in label: + warnings.append(f"Skipping node with invalid label format (expected 'ID:filename')") + continue + + parts = label.split(':') + if len(parts) != 2: + warnings.append(f"Skipping node '{label}' with invalid format") + continue + + _, filename = parts + if not filename: + continue + + file_path = source_path / filename + if not file_path.exists(): + errors.append(f"Source file not found: {filename}") + +def _check_cycles(soup, errors, warnings): + nodes = soup.find_all('node') + edges = soup.find_all('edge') + + node_ids = [node.get('id') for node in nodes if node.get('id')] + if not node_ids: + return + + graph = {nid: [] for nid in node_ids} + for edge in edges: + source = edge.get('source') + target = edge.get('target') + if source and target and source in graph: + graph[source].append(target) + + def has_cycle_from(start, visited, rec_stack): + visited.add(start) + rec_stack.add(start) + + for neighbor in graph.get(start, []): + if neighbor not in visited: + if has_cycle_from(neighbor, visited, rec_stack): + return True + elif neighbor in rec_stack: + return True + + rec_stack.remove(start) + return False + + visited = set() + for node_id in node_ids: + if node_id not in visited: + if has_cycle_from(node_id, visited, set()): + warnings.append("Workflow contains cycles (expected for control loops)") + return + +def _check_zmq_ports(soup, errors, warnings): + edges = soup.find_all('edge') + port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)") + + ports_used = {} + + for edge in edges: + label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel') + if not label_tag or not label_tag.text: + continue + + match = port_pattern.match(label_tag.text.strip()) + if not match: + continue + + port_hex = match.group(1) + port_name = match.group(2) + port_num = int(port_hex, 16) + + if port_num < 1: + errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1") + continue + elif port_num > 65535: + errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)") + continue + + if port_num in ports_used: + existing_name = ports_used[port_num] + if existing_name != port_name: + errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'") + else: + ports_used[port_num] = port_name + + if port_num < 1024: + warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)") + def show_results(console, errors, warnings, info): if errors: console.print("[red]✗ Validation failed[/red]\n") diff --git a/tests/test_graph.py b/tests/test_graph.py index 97102dc..483fd06 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -128,6 +128,165 @@ def test_validate_valid_graph(self): self.assertIn('Validation passed', result.output) self.assertIn('Workflow is valid', result.output) + + def test_validate_missing_source_file(self): + content = ''' + + + + n0:missing.py + + + + ''' + filepath = self.create_graph_file('workflow.graphml', content) + source_dir = Path(self.temp_dir) / 'src' + source_dir.mkdir() + + result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)]) + + self.assertIn('Validation failed', result.output) + self.assertIn('Source file not found: missing.py', result.output) + + def test_validate_with_existing_source_file(self): + content = ''' + + + + n0:exists.py + + + + ''' + filepath = self.create_graph_file('workflow.graphml', content) + source_dir = Path(self.temp_dir) / 'src' + source_dir.mkdir() + (source_dir / 'exists.py').write_text('print("hello")') + + result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)]) + + self.assertIn('Validation passed', result.output) + + def test_validate_zmq_port_conflict(self): + content = ''' + + + + n0:script1.py + + + n1:script2.py + + + 0x1234_portA + + + 0x1234_portB + + + + ''' + filepath = self.create_graph_file('conflict.graphml', content) + + result = self.runner.invoke(cli, ['validate', filepath]) + + self.assertIn('Validation failed', result.output) + self.assertIn('Port conflict', result.output) + + def test_validate_reserved_port(self): + content = ''' + + + + n0:script1.py + + + n1:script2.py + + + 0x50_data + + + + ''' + filepath = self.create_graph_file('reserved.graphml', content) + + result = self.runner.invoke(cli, ['validate', filepath]) + + self.assertIn('Port 80', result.output) + self.assertIn('reserved range', result.output) + + def test_validate_cycle_detection(self): + content = ''' + + + + n0:controller.py + + + n1:plant.py + + + control_signal + + + sensor_data + + + + ''' + filepath = self.create_graph_file('cycle.graphml', content) + + result = self.runner.invoke(cli, ['validate', filepath]) + + self.assertIn('cycles', result.output) + self.assertIn('control loops', result.output) + + def test_validate_port_zero(self): + content = ''' + + + + n0:script1.py + + + n1:script2.py + + + 0x0_invalid + + + + ''' + filepath = self.create_graph_file('port_zero.graphml', content) + + result = self.runner.invoke(cli, ['validate', filepath]) + + self.assertIn('Validation failed', result.output) + self.assertIn('must be at least 1', result.output) + + def test_validate_port_exceeds_maximum(self): + content = ''' + + + + n0:script1.py + + + n1:script2.py + + + 0x10000_toobig + + + + ''' + filepath = self.create_graph_file('port_max.graphml', content) + + result = self.runner.invoke(cli, ['validate', filepath]) + + self.assertIn('Validation failed', result.output) + self.assertIn('exceeds maximum (65535)', result.output) if __name__ == '__main__': unittest.main() \ No newline at end of file