diff --git a/concore.py b/concore.py
index 92f8ce8..a5b4973 100644
--- a/concore.py
+++ b/concore.py
@@ -7,6 +7,8 @@
import re
import zmq
import numpy as np
+import signal
+
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
@@ -81,6 +83,7 @@ def recv_json_with_retry(self):
# Global ZeroMQ ports registry
zmq_ports = {}
+_cleanup_in_progress = False
def init_zmq_port(port_name, port_type, address, socket_type_str):
"""
@@ -107,14 +110,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
def terminate_zmq():
- for port in zmq_ports.values():
+ """Clean up all ZMQ sockets and contexts before exit."""
+ global _cleanup_in_progress
+
+ if _cleanup_in_progress:
+ return # Already cleaning up, prevent reentrant calls
+
+ if not zmq_ports:
+ return # No ports to clean up
+
+ _cleanup_in_progress = True
+ print("\nCleaning up ZMQ resources...")
+ for port_name, port in zmq_ports.items():
try:
port.socket.close()
port.context.term()
+ print(f"Closed ZMQ port: {port_name}")
except Exception as e:
logging.error(f"Error while terminating ZMQ port {port.address}: {e}")
+ zmq_ports.clear()
+ _cleanup_in_progress = False
+def signal_handler(sig, frame):
+ """Handle interrupt signals gracefully."""
+ print(f"\nReceived signal {sig}, shutting down gracefully...")
+ # Prevent terminate_zmq from being called twice: once here and once via atexit
+ try:
+ atexit.unregister(terminate_zmq)
+ except Exception:
+ # If unregister fails for any reason, proceed with explicit cleanup anyway
+ pass
+ terminate_zmq()
+ sys.exit(0)
+
+# Register cleanup handlers
atexit.register(terminate_zmq)
+signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
+if not hasattr(sys, 'getwindowsversion'):
+ signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only)
+
# --- ZeroMQ Integration End ---
diff --git a/concore_cli/README.md b/concore_cli/README.md
index e29c765..e3b55b6 100644
--- a/concore_cli/README.md
+++ b/concore_cli/README.md
@@ -72,16 +72,23 @@ concore run workflow.graphml --source ./src --output ./build --auto-build
Validates a GraphML workflow file before running.
+**Options:**
+- `-s, --source
` - Source directory to verify file references exist
+
Checks:
- Valid XML structure
- GraphML format compliance
- Node and edge definitions
- File references and naming conventions
-- ZMQ vs file-based communication
+- Source file existence (when --source provided)
+- ZMQ port conflicts and reserved ports
+- Circular dependencies (warns for control loops)
+- Edge connectivity
**Example:**
```bash
concore validate workflow.graphml
+concore validate workflow.graphml --source ./src
```
### `concore status`
diff --git a/concore_cli/cli.py b/concore_cli/cli.py
index f714453..f9e35f0 100644
--- a/concore_cli/cli.py
+++ b/concore_cli/cli.py
@@ -47,10 +47,16 @@ def run(workflow_file, source, output, type, auto_build):
@cli.command()
@click.argument('workflow_file', type=click.Path(exists=True))
-def validate(workflow_file):
+@click.option(
+ '--source',
+ '-s',
+ type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
+ help='Source directory to check file references',
+)
+def validate(workflow_file, source):
"""Validate a workflow file"""
try:
- validate_workflow(workflow_file, console)
+ validate_workflow(workflow_file, console, source)
except Exception as e:
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
diff --git a/concore_cli/commands/validate.py b/concore_cli/commands/validate.py
index fa1ea18..1845632 100644
--- a/concore_cli/commands/validate.py
+++ b/concore_cli/commands/validate.py
@@ -5,7 +5,7 @@
import re
import xml.etree.ElementTree as ET
-def validate_workflow(workflow_file, console):
+def validate_workflow(workflow_file, console, source_dir=None):
workflow_path = Path(workflow_file)
console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}")
@@ -138,6 +138,12 @@ def validate_workflow(workflow_file, console):
if file_edges > 0:
info.append(f"File-based edges: {file_edges}")
+ if source_dir:
+ _check_source_files(soup, Path(source_dir), errors, warnings)
+
+ _check_cycles(soup, errors, warnings)
+ _check_zmq_ports(soup, errors, warnings)
+
show_results(console, errors, warnings, info)
except FileNotFoundError:
@@ -145,6 +151,104 @@ def validate_workflow(workflow_file, console):
except Exception as e:
console.print(f"[red]Validation failed:[/red] {str(e)}")
+def _check_source_files(soup, source_path, errors, warnings):
+ nodes = soup.find_all('node')
+
+ for node in nodes:
+ label_tag = node.find('y:NodeLabel') or node.find('NodeLabel')
+ if not label_tag or not label_tag.text:
+ continue
+
+ label = label_tag.text.strip()
+ if ':' not in label:
+ warnings.append(f"Skipping node with invalid label format (expected 'ID:filename')")
+ continue
+
+ parts = label.split(':')
+ if len(parts) != 2:
+ warnings.append(f"Skipping node '{label}' with invalid format")
+ continue
+
+ _, filename = parts
+ if not filename:
+ continue
+
+ file_path = source_path / filename
+ if not file_path.exists():
+ errors.append(f"Source file not found: {filename}")
+
+def _check_cycles(soup, errors, warnings):
+ nodes = soup.find_all('node')
+ edges = soup.find_all('edge')
+
+ node_ids = [node.get('id') for node in nodes if node.get('id')]
+ if not node_ids:
+ return
+
+ graph = {nid: [] for nid in node_ids}
+ for edge in edges:
+ source = edge.get('source')
+ target = edge.get('target')
+ if source and target and source in graph:
+ graph[source].append(target)
+
+ def has_cycle_from(start, visited, rec_stack):
+ visited.add(start)
+ rec_stack.add(start)
+
+ for neighbor in graph.get(start, []):
+ if neighbor not in visited:
+ if has_cycle_from(neighbor, visited, rec_stack):
+ return True
+ elif neighbor in rec_stack:
+ return True
+
+ rec_stack.remove(start)
+ return False
+
+ visited = set()
+ for node_id in node_ids:
+ if node_id not in visited:
+ if has_cycle_from(node_id, visited, set()):
+ warnings.append("Workflow contains cycles (expected for control loops)")
+ return
+
+def _check_zmq_ports(soup, errors, warnings):
+ edges = soup.find_all('edge')
+ port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")
+
+ ports_used = {}
+
+ for edge in edges:
+ label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel')
+ if not label_tag or not label_tag.text:
+ continue
+
+ match = port_pattern.match(label_tag.text.strip())
+ if not match:
+ continue
+
+ port_hex = match.group(1)
+ port_name = match.group(2)
+ port_num = int(port_hex, 16)
+
+ if port_num < 1:
+ errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1")
+ continue
+ elif port_num > 65535:
+ errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)")
+ continue
+
+ if port_num in ports_used:
+ existing_name = ports_used[port_num]
+ if existing_name != port_name:
+ errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'")
+ else:
+ ports_used[port_num] = port_name
+
+ if port_num < 1024:
+ warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)")
+
def show_results(console, errors, warnings, info):
if errors:
console.print("[red]✗ Validation failed[/red]\n")
diff --git a/tests/test_graph.py b/tests/test_graph.py
index 97102dc..483fd06 100644
--- a/tests/test_graph.py
+++ b/tests/test_graph.py
@@ -128,6 +128,165 @@ def test_validate_valid_graph(self):
self.assertIn('Validation passed', result.output)
self.assertIn('Workflow is valid', result.output)
+
+ def test_validate_missing_source_file(self):
+ content = '''
+
+
+
+ n0:missing.py
+
+
+
+ '''
+ filepath = self.create_graph_file('workflow.graphml', content)
+ source_dir = Path(self.temp_dir) / 'src'
+ source_dir.mkdir()
+
+ result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('Source file not found: missing.py', result.output)
+
+ def test_validate_with_existing_source_file(self):
+ content = '''
+
+
+
+ n0:exists.py
+
+
+
+ '''
+ filepath = self.create_graph_file('workflow.graphml', content)
+ source_dir = Path(self.temp_dir) / 'src'
+ source_dir.mkdir()
+ (source_dir / 'exists.py').write_text('print("hello")')
+
+ result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])
+
+ self.assertIn('Validation passed', result.output)
+
+ def test_validate_zmq_port_conflict(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x1234_portA
+
+
+ 0x1234_portB
+
+
+
+ '''
+ filepath = self.create_graph_file('conflict.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('Port conflict', result.output)
+
+ def test_validate_reserved_port(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x50_data
+
+
+
+ '''
+ filepath = self.create_graph_file('reserved.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Port 80', result.output)
+ self.assertIn('reserved range', result.output)
+
+ def test_validate_cycle_detection(self):
+ content = '''
+
+
+
+ n0:controller.py
+
+
+ n1:plant.py
+
+
+ control_signal
+
+
+ sensor_data
+
+
+
+ '''
+ filepath = self.create_graph_file('cycle.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('cycles', result.output)
+ self.assertIn('control loops', result.output)
+
+ def test_validate_port_zero(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x0_invalid
+
+
+
+ '''
+ filepath = self.create_graph_file('port_zero.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('must be at least 1', result.output)
+
+ def test_validate_port_exceeds_maximum(self):
+ content = '''
+
+
+
+ n0:script1.py
+
+
+ n1:script2.py
+
+
+ 0x10000_toobig
+
+
+
+ '''
+ filepath = self.create_graph_file('port_max.graphml', content)
+
+ result = self.runner.invoke(cli, ['validate', filepath])
+
+ self.assertIn('Validation failed', result.output)
+ self.assertIn('exceeds maximum (65535)', result.output)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file