-
Notifications
You must be signed in to change notification settings - Fork 32
Feature/enhanced workflow validation #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
ff3206a
b3268f3
6355f7b
6c6d2ae
514d522
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,8 @@ | |
| import re | ||
| import zmq | ||
| import numpy as np | ||
| import signal | ||
|
|
||
| logging.basicConfig( | ||
| level=logging.INFO, | ||
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | ||
|
|
@@ -81,6 +83,7 @@ def recv_json_with_retry(self): | |
|
|
||
| # Global ZeroMQ ports registry | ||
| zmq_ports = {} | ||
| _cleanup_in_progress = False | ||
|
|
||
| def init_zmq_port(port_name, port_type, address, socket_type_str): | ||
| """ | ||
|
|
@@ -107,14 +110,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str): | |
| logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") | ||
|
|
||
| def terminate_zmq(): | ||
| for port in zmq_ports.values(): | ||
| """Clean up all ZMQ sockets and contexts before exit.""" | ||
| global _cleanup_in_progress | ||
|
|
||
| if _cleanup_in_progress: | ||
| return # Already cleaning up, prevent reentrant calls | ||
|
|
||
| if not zmq_ports: | ||
| return # No ports to clean up | ||
|
|
||
| _cleanup_in_progress = True | ||
| print("\nCleaning up ZMQ resources...") | ||
| for port_name, port in zmq_ports.items(): | ||
| try: | ||
| port.socket.close() | ||
| port.context.term() | ||
| print(f"Closed ZMQ port: {port_name}") | ||
| except Exception as e: | ||
| logging.error(f"Error while terminating ZMQ port {port.address}: {e}") | ||
| zmq_ports.clear() | ||
| _cleanup_in_progress = False | ||
|
|
||
| def signal_handler(sig, frame): | ||
| """Handle interrupt signals gracefully.""" | ||
| print(f"\nReceived signal {sig}, shutting down gracefully...") | ||
| # Prevent terminate_zmq from being called twice: once here and once via atexit | ||
| try: | ||
| atexit.unregister(terminate_zmq) | ||
| except Exception: | ||
| # If unregister fails for any reason, proceed with explicit cleanup anyway | ||
| pass | ||
| terminate_zmq() | ||
| sys.exit(0) | ||
|
|
||
| # Register cleanup handlers | ||
| atexit.register(terminate_zmq) | ||
| signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C | ||
| if not hasattr(sys, 'getwindowsversion'): | ||
| signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only) | ||
|
|
||
|
Comment on lines
+134
to
+151
|
||
| # --- ZeroMQ Integration End --- | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,7 @@ | |
| import re | ||
| import xml.etree.ElementTree as ET | ||
|
|
||
| def validate_workflow(workflow_file, console): | ||
| def validate_workflow(workflow_file, console, source_dir=None): | ||
| workflow_path = Path(workflow_file) | ||
|
|
||
| console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}") | ||
|
|
@@ -138,13 +138,117 @@ def validate_workflow(workflow_file, console): | |
| if file_edges > 0: | ||
| info.append(f"File-based edges: {file_edges}") | ||
|
|
||
| if source_dir: | ||
| _check_source_files(soup, Path(source_dir), errors, warnings) | ||
|
|
||
| _check_cycles(soup, errors, warnings) | ||
| _check_zmq_ports(soup, errors, warnings) | ||
|
|
||
| show_results(console, errors, warnings, info) | ||
|
|
||
| except FileNotFoundError: | ||
| console.print(f"[red]Error:[/red] File not found: {workflow_path}") | ||
| except Exception as e: | ||
| console.print(f"[red]Validation failed:[/red] {str(e)}") | ||
|
|
||
| def _check_source_files(soup, source_path, errors, warnings): | ||
| nodes = soup.find_all('node') | ||
|
|
||
|
Comment on lines
+154
to
+156
|
||
| for node in nodes: | ||
| label_tag = node.find('y:NodeLabel') or node.find('NodeLabel') | ||
| if not label_tag or not label_tag.text: | ||
| continue | ||
|
|
||
| label = label_tag.text.strip() | ||
| if ':' not in label: | ||
| warnings.append(f"Skipping node with invalid label format (expected 'ID:filename')") | ||
| continue | ||
|
|
||
| parts = label.split(':') | ||
| if len(parts) != 2: | ||
| warnings.append(f"Skipping node '{label}' with invalid format") | ||
| continue | ||
|
|
||
| _, filename = parts | ||
| if not filename: | ||
| continue | ||
|
|
||
| file_path = source_path / filename | ||
| if not file_path.exists(): | ||
| errors.append(f"Source file not found: {filename}") | ||
|
|
||
| def _check_cycles(soup, errors, warnings): | ||
| nodes = soup.find_all('node') | ||
| edges = soup.find_all('edge') | ||
|
|
||
| node_ids = [node.get('id') for node in nodes if node.get('id')] | ||
| if not node_ids: | ||
| return | ||
|
|
||
| graph = {nid: [] for nid in node_ids} | ||
| for edge in edges: | ||
| source = edge.get('source') | ||
| target = edge.get('target') | ||
| if source and target and source in graph: | ||
| graph[source].append(target) | ||
|
|
||
| def has_cycle_from(start, visited, rec_stack): | ||
| visited.add(start) | ||
| rec_stack.add(start) | ||
|
|
||
| for neighbor in graph.get(start, []): | ||
| if neighbor not in visited: | ||
| if has_cycle_from(neighbor, visited, rec_stack): | ||
| return True | ||
| elif neighbor in rec_stack: | ||
| return True | ||
|
|
||
| rec_stack.remove(start) | ||
| return False | ||
|
|
||
| visited = set() | ||
| for node_id in node_ids: | ||
| if node_id not in visited: | ||
| if has_cycle_from(node_id, visited, set()): | ||
| warnings.append("Workflow contains cycles (expected for control loops)") | ||
| return | ||
|
|
||
| def _check_zmq_ports(soup, errors, warnings): | ||
| edges = soup.find_all('edge') | ||
| port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)") | ||
|
|
||
| ports_used = {} | ||
|
|
||
| for edge in edges: | ||
| label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel') | ||
| if not label_tag or not label_tag.text: | ||
| continue | ||
|
|
||
| match = port_pattern.match(label_tag.text.strip()) | ||
| if not match: | ||
| continue | ||
|
|
||
| port_hex = match.group(1) | ||
| port_name = match.group(2) | ||
| port_num = int(port_hex, 16) | ||
|
|
||
| if port_num < 1: | ||
| errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1") | ||
| continue | ||
| elif port_num > 65535: | ||
| errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)") | ||
| continue | ||
|
|
||
| if port_num in ports_used: | ||
| existing_name = ports_used[port_num] | ||
| if existing_name != port_name: | ||
| errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'") | ||
| else: | ||
| ports_used[port_num] = port_name | ||
|
|
||
| if port_num < 1024: | ||
| warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)") | ||
|
|
||
| def show_results(console, errors, warnings, info): | ||
| if errors: | ||
| console.print("[red]✗ Validation failed[/red]\n") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
terminate_zmq()/signal_handler() print directly to stdout (cleanup messages). This can pollute workflow output and makes log handling inconsistent with the rest of the file which uses logging. Prefer logging.info/warning (or gate prints behind a verbosity/TTY check) for these messages.