From f534d711ce990d4d00c54283b1727be250e3f2dd Mon Sep 17 00:00:00 2001 From: GREENRAT-K405 Date: Mon, 9 Feb 2026 00:48:14 +0530 Subject: [PATCH 1/7] update concoredocker.py to be up-to-date with concore.py --- concoredocker.py | 214 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 185 insertions(+), 29 deletions(-) diff --git a/concoredocker.py b/concoredocker.py index 65463c9..31ce666 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -2,13 +2,112 @@ from ast import literal_eval import re import os +import logging +import zmq +import numpy as np + +logging.basicConfig( + level=logging.INFO, + format='%(levelname)s - %(message)s' +) + +class ZeroMQPort: + def __init__(self, port_type, address, zmq_socket_type): + self.context = zmq.Context() + self.socket = self.context.socket(zmq_socket_type) + self.port_type = port_type + self.address = address + + # Configure timeouts & immediate close on failure + self.socket.setsockopt(zmq.RCVTIMEO, 2000) # 2 sec receive timeout + self.socket.setsockopt(zmq.SNDTIMEO, 2000) # 2 sec send timeout + self.socket.setsockopt(zmq.LINGER, 0) # Drop pending messages on close + + # Bind or connect + if self.port_type == "bind": + self.socket.bind(address) + logging.info(f"ZMQ Port bound to {address}") + else: + self.socket.connect(address) + logging.info(f"ZMQ Port connected to {address}") + + def send_json_with_retry(self, message): + """Send JSON message with retries if timeout occurs.""" + for attempt in range(5): + try: + self.socket.send_json(message) + return + except zmq.Again: + logging.warning(f"Send timeout (attempt {attempt + 1}/5)") + time.sleep(0.5) + logging.error("Failed to send after retries.") + return + + def recv_json_with_retry(self): + """Receive JSON message with retries if timeout occurs.""" + for attempt in range(5): + try: + return self.socket.recv_json() + except zmq.Again: + logging.warning(f"Receive timeout (attempt {attempt + 1}/5)") + time.sleep(0.5) + logging.error("Failed to receive after retries.") + return None + +# Global ZeroMQ ports registry +zmq_ports = {} + +def init_zmq_port(port_name, port_type, address, socket_type_str): + """ + Initializes and registers a ZeroMQ port. + port_name (str): A unique name for this ZMQ port. + port_type (str): "bind" or "connect". + address (str): The ZMQ address (e.g., "tcp://*:5555", "tcp://localhost:5555"). + socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB"). + """ + if port_name in zmq_ports: + logging.info(f"ZMQ Port {port_name} already initialized.") + return#avoid reinstallation + try: + # Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP) + zmq_socket_type = getattr(zmq, socket_type_str.upper()) + zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type) + logging.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}") + except AttributeError: + logging.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.") + except zmq.error.ZMQError as e: + logging.error(f"Error initializing ZMQ port {port_name} on {address}: {e}") + except Exception as e: + logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") + +def terminate_zmq(): + for port in zmq_ports.values(): + try: + port.socket.close() + port.context.term() + except Exception as e: + logging.error(f"Error while terminating ZMQ port {port.address}: {e}") +# --- ZeroMQ Integration End --- + +# NumPy Type Conversion Helper +def convert_numpy_to_python(obj): + if isinstance(obj, np.generic): + return obj.item() + elif isinstance(obj, list): + return [convert_numpy_to_python(item) for item in obj] + elif isinstance(obj, tuple): + return tuple(convert_numpy_to_python(item) for item in obj) + elif isinstance(obj, dict): + return {key: convert_numpy_to_python(value) for key, value in obj.items()} + else: + return obj def safe_literal_eval(filename, defaultValue): try: with open(filename, "r") as file: return literal_eval(file.read()) except (FileNotFoundError, SyntaxError, ValueError, Exception) as e: - print(f"Error reading {filename}: {e}") + logging.error(f"Error reading {filename}: {e}") return defaultValue iport = safe_literal_eval("concore.iport", {}) @@ -21,8 +120,8 @@ def safe_literal_eval(filename, defaultValue): inpath = os.path.abspath("/in") outpath = os.path.abspath("/out") simtime = 0 -concore_params_file = os.path.join(inpath, "1", "concore.params") -concore_maxtime_file = os.path.join(inpath, "1", "concore.maxtime") +concore_params_file = os.path.join(inpath + "1", "concore.params") +concore_maxtime_file = os.path.join(inpath + "1", "concore.maxtime") #9/21/22 def parse_params(sparams): @@ -42,7 +141,7 @@ def parse_params(sparams): pass # keep backward compatibility: comma-separated params - for item in s.split(","): + for item in s.split(";"): if "=" in item: key, value = item.split("=", 1) key = key.strip() @@ -66,12 +165,13 @@ def parse_params(sparams): sparams = sparams[:sparams.find('"')] if sparams: - print("parsing sparams:", sparams) + logging.debug("parsing sparams: "+sparams) params = parse_params(sparams) + logging.debug("parsed params: " + str(params)) else: params = dict() except Exception as e: - print(f"Error reading concore.params: {e}") + logging.error(f"Error reading concore.params: {e}") params = dict() #9/30/22 @@ -93,74 +193,130 @@ def unchanged(): olds = s return False -def read(port, name, initstr): +def read(port_identifier, name, initstr_val): global s, simtime, retrycount - max_retries=5 + + default_return_val = initstr_val + if isinstance(initstr_val, str): + try: + default_return_val = literal_eval(initstr_val) + except (SyntaxError, ValueError): + pass + + if isinstance(port_identifier, str) and port_identifier in zmq_ports: + zmq_p = zmq_ports[port_identifier] + try: + message = zmq_p.recv_json_with_retry() + return message + except zmq.error.ZMQError as e: + logging.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.") + return default_return_val + except Exception as e: + logging.error(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.") + return default_return_val + + try: + file_port_num = int(port_identifier) + except ValueError: + logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.") + return default_return_val + time.sleep(delay) - file_path = os.path.join(inpath, str(port), name) + file_path = os.path.join(inpath, str(file_port_num), name) try: with open(file_path, "r") as infile: ins = infile.read() except FileNotFoundError: - print(f"File {file_path} not found, using default value.") - ins = initstr + ins = str(initstr_val) + s += ins except Exception as e: - print(f"Error reading {file_path}: {e}") - return initstr + logging.error(f"Error reading {file_path}: {e}") + return default_return_val attempts = 0 + max_retries = 5 while len(ins) == 0 and attempts < max_retries: time.sleep(delay) try: with open(file_path, "r") as infile: ins = infile.read() except Exception as e: - print(f"Retry {attempts + 1}: Error reading {file_path} - {e}") + logging.warning(f"Retry {attempts + 1}: Error reading {file_path} - {e}") attempts += 1 retrycount += 1 if len(ins) == 0: - print(f"Max retries reached for {file_path}, using default value.") - return initstr + logging.error(f"Max retries reached for {file_path}, using default value.") + return default_return_val s += ins try: inval = literal_eval(ins) - simtime = max(simtime, inval[0]) - return inval[1:] + if isinstance(inval, list) and len(inval) > 0: + current_simtime_from_file = inval[0] + if isinstance(current_simtime_from_file, (int, float)): + simtime = max(simtime, current_simtime_from_file) + return inval[1:] + else: + logging.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.") + return inval except Exception as e: - print(f"Error parsing {ins}: {e}") - return initstr + logging.error(f"Error parsing {ins}: {e}") + return default_return_val - -def write(port, name, val, delta=0): +def write(port_identifier, name, val, delta=0): global simtime - file_path = os.path.join(outpath, str(port), name) + + if isinstance(port_identifier, str) and port_identifier in zmq_ports: + zmq_p = zmq_ports[port_identifier] + try: + zmq_p.send_json_with_retry(val) + except zmq.error.ZMQError as e: + logging.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") + except Exception as e: + logging.error(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}") + + try: + file_port_num = int(port_identifier) + file_path = os.path.join(outpath, str(file_port_num), name) + except ValueError: + logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.") + return if isinstance(val, str): time.sleep(2 * delay) elif not isinstance(val, list): - print("write must have list or str") + logging.error("write must have list or str") return try: with open(file_path, "w") as outfile: if isinstance(val, list): - outfile.write(str([simtime + delta] + val)) + val_converted = convert_numpy_to_python(val) + outfile.write(str([simtime + delta] + val_converted)) simtime += delta else: outfile.write(val) except Exception as e: - print(f"Error writing to {file_path}: {e}") + logging.error(f"Error writing to {file_path}: {e}") def initval(simtime_val): global simtime try: val = literal_eval(simtime_val) - simtime = val[0] - return val[1:] + if isinstance(val, list) and len(val) > 0: + first_element = val[0] + if isinstance(first_element, (int, float)): + simtime = first_element + return val[1:] + else: + logging.error(f"Error: First element in initval string '{simtime_val}' is not a number.") + return val[1:] if len(val) > 1 else [] + else: + logging.error(f"Error: initval string '{simtime_val}' is not a list or is empty.") + return [] except Exception as e: - print(f"Error parsing simtime_val: {e}") + logging.error(f"Error parsing simtime_val: {e}") return [] From 2bad98f9476f707ea3d996b47d38e9b2c500d8f1 Mon Sep 17 00:00:00 2001 From: GREENRAT-K405 Date: Mon, 9 Feb 2026 00:55:18 +0530 Subject: [PATCH 2/7] keep the logs consistent with concore.py --- concoredocker.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/concoredocker.py b/concoredocker.py index 31ce666..880a774 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -231,7 +231,7 @@ def read(port_identifier, name, initstr_val): ins = str(initstr_val) s += ins except Exception as e: - logging.error(f"Error reading {file_path}: {e}") + logging.error(f"Error reading {file_path}: {e}. Using default value.") return default_return_val attempts = 0 @@ -262,7 +262,7 @@ def read(port_identifier, name, initstr_val): logging.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.") return inval except Exception as e: - logging.error(f"Error parsing {ins}: {e}") + logging.error(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.") return default_return_val def write(port_identifier, name, val, delta=0): @@ -287,14 +287,15 @@ def write(port_identifier, name, val, delta=0): if isinstance(val, str): time.sleep(2 * delay) elif not isinstance(val, list): - logging.error("write must have list or str") + logging.error(f"File write to {file_path} must have list or str value, got {type(val)}") return try: with open(file_path, "w") as outfile: if isinstance(val, list): val_converted = convert_numpy_to_python(val) - outfile.write(str([simtime + delta] + val_converted)) + data_to_write = [simtime + delta] + val_converted + outfile.write(str(data_to_write)) simtime += delta else: outfile.write(val) @@ -311,12 +312,12 @@ def initval(simtime_val): simtime = first_element return val[1:] else: - logging.error(f"Error: First element in initval string '{simtime_val}' is not a number.") + logging.error(f"Error: First element in initval string '{simtime_val}' is not a number. Using data part as is or empty.") return val[1:] if len(val) > 1 else [] else: - logging.error(f"Error: initval string '{simtime_val}' is not a list or is empty.") + logging.error(f"Error: initval string '{simtime_val}' is not a list or is empty. Returning empty list.") return [] except Exception as e: - logging.error(f"Error parsing simtime_val: {e}") + logging.error(f"Error parsing simtime_val_str '{simtime_val}': {e}. Returning empty list.") return [] From 073c6a6038bbeab607fc19697805a2c6976034af Mon Sep 17 00:00:00 2001 From: PARAM KANADA Date: Mon, 9 Feb 2026 01:13:13 +0530 Subject: [PATCH 3/7] Update concoredocker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- concoredocker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/concoredocker.py b/concoredocker.py index 880a774..3e296f5 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -222,7 +222,8 @@ def read(port_identifier, name, initstr_val): return default_return_val time.sleep(delay) - file_path = os.path.join(inpath, str(file_port_num), name) + # Construct file path consistent with other components (e.g., /in1/) + file_path = os.path.join(inpath + str(file_port_num), name) try: with open(file_path, "r") as infile: From b6bae58b48ba32d1992c526ad373a8be7cbeb63d Mon Sep 17 00:00:00 2001 From: PARAM KANADA Date: Mon, 9 Feb 2026 01:15:31 +0530 Subject: [PATCH 4/7] Update concoredocker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- concoredocker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/concoredocker.py b/concoredocker.py index 3e296f5..1a3f6b4 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -201,7 +201,11 @@ def read(port_identifier, name, initstr_val): try: default_return_val = literal_eval(initstr_val) except (SyntaxError, ValueError): - pass + # Failed to parse initstr_val; fall back to the original string value. + logging.debug( + "Could not parse initstr_val %r with literal_eval; using raw string as default.", + initstr_val + ) if isinstance(port_identifier, str) and port_identifier in zmq_ports: zmq_p = zmq_ports[port_identifier] From f597ad0cce9164daf6cebe7874de7b8e118272c4 Mon Sep 17 00:00:00 2001 From: PARAM KANADA Date: Mon, 9 Feb 2026 01:21:21 +0530 Subject: [PATCH 5/7] Update concoredocker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- concoredocker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concoredocker.py b/concoredocker.py index 1a3f6b4..c95a178 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -284,7 +284,7 @@ def write(port_identifier, name, val, delta=0): try: file_port_num = int(port_identifier) - file_path = os.path.join(outpath, str(file_port_num), name) + file_path = os.path.join(outpath + str(file_port_num), name) except ValueError: logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.") return From f9dd68880a4b4bdea5a936119e916e821dea7107 Mon Sep 17 00:00:00 2001 From: PARAM KANADA Date: Mon, 9 Feb 2026 01:25:12 +0530 Subject: [PATCH 6/7] Change error logging to info level for file read --- concoredocker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concoredocker.py b/concoredocker.py index c95a178..18e0078 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -107,7 +107,7 @@ def safe_literal_eval(filename, defaultValue): with open(filename, "r") as file: return literal_eval(file.read()) except (FileNotFoundError, SyntaxError, ValueError, Exception) as e: - logging.error(f"Error reading {filename}: {e}") + logging.info(f"Error reading {filename}: {e}") return defaultValue iport = safe_literal_eval("concore.iport", {}) From 1e765584f443f42796deda865d5b607ccea9ac83 Mon Sep 17 00:00:00 2001 From: PARAM KANADA Date: Mon, 9 Feb 2026 02:04:30 +0530 Subject: [PATCH 7/7] Update regarding moving away from backward compatibility for params Updated comment to reflect potential breaking change regarding comma-separated parameters. --- concoredocker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concoredocker.py b/concoredocker.py index 18e0078..d337c38 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -140,7 +140,7 @@ def parse_params(sparams): except (ValueError, SyntaxError): pass - # keep backward compatibility: comma-separated params + # Potentially breaking backward compatibility: moving away from the comma-separated params for item in s.split(";"): if "=" in item: key, value = item.split("=", 1)