Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import re
import zmq
import numpy as np
import signal

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
Expand Down Expand Up @@ -81,6 +83,7 @@ def recv_json_with_retry(self):

# Global ZeroMQ ports registry
zmq_ports = {}
_cleanup_in_progress = False

def init_zmq_port(port_name, port_type, address, socket_type_str):
"""
Expand All @@ -107,14 +110,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

def terminate_zmq():
for port in zmq_ports.values():
"""Clean up all ZMQ sockets and contexts before exit."""
global _cleanup_in_progress

if _cleanup_in_progress:
return # Already cleaning up, prevent reentrant calls

if not zmq_ports:
return # No ports to clean up

_cleanup_in_progress = True
print("\nCleaning up ZMQ resources...")
for port_name, port in zmq_ports.items():
try:
port.socket.close()
port.context.term()
print(f"Closed ZMQ port: {port_name}")
except Exception as e:
logging.error(f"Error while terminating ZMQ port {port.address}: {e}")
zmq_ports.clear()
_cleanup_in_progress = False

def signal_handler(sig, frame):
"""Handle interrupt signals gracefully."""
print(f"\nReceived signal {sig}, shutting down gracefully...")
Comment on lines +123 to +136
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terminate_zmq()/signal_handler() print directly to stdout (cleanup messages). This can pollute workflow output and makes log handling inconsistent with the rest of the file which uses logging. Prefer logging.info/warning (or gate prints behind a verbosity/TTY check) for these messages.

Copilot uses AI. Check for mistakes.
# Prevent terminate_zmq from being called twice: once here and once via atexit
try:
atexit.unregister(terminate_zmq)
except Exception:
# If unregister fails for any reason, proceed with explicit cleanup anyway
pass
terminate_zmq()
sys.exit(0)

# Register cleanup handlers
atexit.register(terminate_zmq)
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
if not hasattr(sys, 'getwindowsversion'):
signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only)

Comment on lines +134 to +151
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description doesn’t mention changes to concore.py, but this PR adds signal handling and alters ZMQ shutdown behavior here. Either update the PR description to include this file/change or split it into a separate PR so the validate-related change set stays focused.

Copilot uses AI. Check for mistakes.
# --- ZeroMQ Integration End ---


Expand Down
9 changes: 8 additions & 1 deletion concore_cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,23 @@ concore run workflow.graphml --source ./src --output ./build --auto-build

Validates a GraphML workflow file before running.

**Options:**
- `-s, --source <dir>` - Source directory to verify file references exist

Checks:
- Valid XML structure
- GraphML format compliance
- Node and edge definitions
- File references and naming conventions
- ZMQ vs file-based communication
- Source file existence (when --source provided)
- ZMQ port conflicts and reserved ports
- Circular dependencies (warns for control loops)
- Edge connectivity

**Example:**
```bash
concore validate workflow.graphml
concore validate workflow.graphml --source ./src
```

### `concore status`
Expand Down
10 changes: 8 additions & 2 deletions concore_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,16 @@ def run(workflow_file, source, output, type, auto_build):

@cli.command()
@click.argument('workflow_file', type=click.Path(exists=True))
def validate(workflow_file):
@click.option(
'--source',
'-s',
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
help='Source directory to check file references',
)
def validate(workflow_file, source):
"""Validate a workflow file"""
try:
validate_workflow(workflow_file, console)
validate_workflow(workflow_file, console, source)
except Exception as e:
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
Expand Down
106 changes: 105 additions & 1 deletion concore_cli/commands/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re
import xml.etree.ElementTree as ET

def validate_workflow(workflow_file, console):
def validate_workflow(workflow_file, console, source_dir=None):
workflow_path = Path(workflow_file)

console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}")
Expand Down Expand Up @@ -138,13 +138,117 @@ def validate_workflow(workflow_file, console):
if file_edges > 0:
info.append(f"File-based edges: {file_edges}")

if source_dir:
_check_source_files(soup, Path(source_dir), errors, warnings)

_check_cycles(soup, errors, warnings)
_check_zmq_ports(soup, errors, warnings)

show_results(console, errors, warnings, info)

except FileNotFoundError:
console.print(f"[red]Error:[/red] File not found: {workflow_path}")
except Exception as e:
console.print(f"[red]Validation failed:[/red] {str(e)}")

def _check_source_files(soup, source_path, errors, warnings):
nodes = soup.find_all('node')

Comment on lines +154 to +156
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_check_source_files() takes a warnings parameter but never uses it. Either remove the unused parameter (and update the call site) or add a warning for skipped/invalid node label formats so callers can understand why some nodes weren’t checked.

Copilot uses AI. Check for mistakes.
for node in nodes:
label_tag = node.find('y:NodeLabel') or node.find('NodeLabel')
if not label_tag or not label_tag.text:
continue

label = label_tag.text.strip()
if ':' not in label:
warnings.append(f"Skipping node with invalid label format (expected 'ID:filename')")
continue

parts = label.split(':')
if len(parts) != 2:
warnings.append(f"Skipping node '{label}' with invalid format")
continue

_, filename = parts
if not filename:
continue

file_path = source_path / filename
if not file_path.exists():
errors.append(f"Source file not found: {filename}")

def _check_cycles(soup, errors, warnings):
nodes = soup.find_all('node')
edges = soup.find_all('edge')

node_ids = [node.get('id') for node in nodes if node.get('id')]
if not node_ids:
return

graph = {nid: [] for nid in node_ids}
for edge in edges:
source = edge.get('source')
target = edge.get('target')
if source and target and source in graph:
graph[source].append(target)

def has_cycle_from(start, visited, rec_stack):
visited.add(start)
rec_stack.add(start)

for neighbor in graph.get(start, []):
if neighbor not in visited:
if has_cycle_from(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True

rec_stack.remove(start)
return False

visited = set()
for node_id in node_ids:
if node_id not in visited:
if has_cycle_from(node_id, visited, set()):
warnings.append("Workflow contains cycles (expected for control loops)")
return

def _check_zmq_ports(soup, errors, warnings):
edges = soup.find_all('edge')
port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")

ports_used = {}

for edge in edges:
label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel')
if not label_tag or not label_tag.text:
continue

match = port_pattern.match(label_tag.text.strip())
if not match:
continue

port_hex = match.group(1)
port_name = match.group(2)
port_num = int(port_hex, 16)

if port_num < 1:
errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1")
continue
elif port_num > 65535:
errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)")
continue

if port_num in ports_used:
existing_name = ports_used[port_num]
if existing_name != port_name:
errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'")
else:
ports_used[port_num] = port_name

if port_num < 1024:
warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)")

def show_results(console, errors, warnings, info):
if errors:
console.print("[red]✗ Validation failed[/red]\n")
Expand Down
Loading
Loading