diff --git a/libensemble/history.py b/libensemble/history.py index 3d3b5bc6f..388ba08b1 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -82,6 +82,8 @@ def __init__( H = np.zeros(L + len(H0), dtype=specs_dtype_list) H["sim_id"][-L:] = -1 + if "_id" in H.dtype.names: + H["_id"][-L:] = -1 H["sim_started_time"][-L:] = np.inf H["gen_informed_time"][-L:] = np.inf @@ -270,6 +272,8 @@ def grow_H(self, k: int) -> None: """ H_1 = np.zeros(k, dtype=self.H.dtype) H_1["sim_id"] = -1 + if "_id" in H_1.dtype.names: + H_1["_id"] = -1 H_1["sim_started_time"] = np.inf H_1["gen_informed_time"] = np.inf if "resource_sets" in H_1.dtype.names: diff --git a/libensemble/manager.py b/libensemble/manager.py index 97f8f8225..22ae8b5d3 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -410,6 +410,14 @@ def _freeup_resources(self, w: int) -> None: if self.resources: self.resources.resource_manager.free_rsets(w) + def _ensure_sim_id_in_persis_in(self, D: npt.NDArray) -> None: + """Add sim_id to gen_specs persis_in if generator output contains sim_id (gest-api style generators only)""" + if self.gen_specs.get("generator") and len(D) > 0 and "sim_id" in D.dtype.names: + if "persis_in" not in self.gen_specs: + self.gen_specs["persis_in"] = [] + if "sim_id" not in self.gen_specs["persis_in"]: + self.gen_specs["persis_in"].append("sim_id") + def _send_work_order(self, Work: dict, w: int) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") @@ -483,6 +491,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - final_data = D_recv.get("calc_out", None) if isinstance(final_data, np.ndarray): if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False): + self._ensure_sim_id_in_persis_in(final_data) self.hist.update_history_x_in(w, final_data, self.W[w]["gen_started_time"]) elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False): self.hist.update_history_f(D_recv, self.kill_canceled_sims) @@ -500,7 +509,9 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - if calc_type == EVAL_SIM_TAG: self.hist.update_history_f(D_recv, self.kill_canceled_sims) if calc_type == EVAL_GEN_TAG: - self.hist.update_history_x_in(w, D_recv["calc_out"], self.W[w]["gen_started_time"]) + D = D_recv["calc_out"] + self._ensure_sim_id_in_persis_in(D) + self.hist.update_history_x_in(w, D, self.W[w]["gen_started_time"]) assert ( len(D_recv["calc_out"]) or np.any(self.W["active"]) or self.W[w]["persis_state"] ), "Gen must return work when is is the only thing active and not persistent." diff --git a/libensemble/specs.py b/libensemble/specs.py index dac1baae4..a38efd4cb 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -247,6 +247,10 @@ def set_fields_from_vocs(self): persis_in_fields.extend(list(obj.keys())) self.persis_in = persis_in_fields + # Set inputs: same as persis_in for gest-api generators (needed for H0 ingestion) + if not self.inputs and self.generator is not None: + self.inputs = self.persis_in + # Set outputs: variables + constants (what the generator produces) if not self.outputs: out_fields = [] @@ -257,6 +261,17 @@ def set_fields_from_vocs(self): out_fields.append(_convert_dtype_to_output_tuple(name, dtype)) self.outputs = out_fields + # Add _id field if generator returns_id is True + if self.generator is not None and getattr(self.generator, "returns_id", False): + if self.outputs is None: + self.outputs = [] + if "_id" not in [f[0] for f in self.outputs]: + self.outputs.append(("_id", int)) + if self.persis_in is None: + self.persis_in = [] + if "_id" not in self.persis_in: + self.persis_in.append("_id") + return self diff --git a/libensemble/tests/regression_tests/test_optimas_ax_mf.py b/libensemble/tests/regression_tests/test_optimas_ax_mf.py new file mode 100644 index 000000000..b6f43b3ed --- /dev/null +++ b/libensemble/tests/regression_tests/test_optimas_ax_mf.py @@ -0,0 +1,84 @@ +""" +Tests libEnsemble with Optimas Multi-Fidelity Ax Generator + +*****currently fixing nworkers to batch_size***** + +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_optimas_ax_mf.py + python test_optimas_ax_mf.py -n 4 + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 4 as the generator is on the manager. + +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true + +import numpy as np + +from gest_api.vocs import VOCS +from optimas.generators import AxMultiFidelityGenerator + +from libensemble import Ensemble +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + +def eval_func_mf(input_params): + """Evaluation function for multifidelity test.""" + x0 = input_params["x0"] + x1 = input_params["x1"] + resolution = input_params["res"] + result = -( + (x0 + 10 * np.cos(x0 + 0.1 * resolution)) + * (x1 + 5 * np.cos(x1 - 0.2 * resolution)) + ) + return {"f": result} + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + n = 2 + batch_size = 2 + + libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + + vocs = VOCS( + variables={"x0": [-50.0, 5.0], "x1": [-5.0, 15.0], "res": [1.0, 8.0]}, + objectives={"f": "MAXIMIZE"}, + ) + + gen = AxMultiFidelityGenerator(vocs=vocs) + + gen_specs = GenSpecs( + generator=gen, + batch_size=batch_size, + vocs=vocs, + ) + + sim_specs = SimSpecs( + simulator=eval_func_mf, + vocs=vocs, + ) + + alloc_specs = AllocSpecs(alloc_f=alloc_f) + exit_criteria = ExitCriteria(sim_max=6) + + workflow = Ensemble( + libE_specs=libE_specs, + sim_specs=sim_specs, + alloc_specs=alloc_specs, + gen_specs=gen_specs, + exit_criteria=exit_criteria, + ) + + H, _, _ = workflow.run() + + # Perform the run + if workflow.is_manager: + workflow.save_output(__file__) + print(f"Completed {len(H)} simulations") diff --git a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py new file mode 100644 index 000000000..04a2b5430 --- /dev/null +++ b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py @@ -0,0 +1,110 @@ +""" +Tests libEnsemble with Optimas Multitask Ax Generator + +Runs an initial ensemble, followed by another using the first as an H0. + +*****currently fixing nworkers to batch_size***** + +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_optimas_ax_multitask.py + python test_optimas_ax_multitask.py -n 4 + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 4 as the generator is on the manager. + +Issues: In some cases, the generator fails to produce points. This is +intermittent and can be seen by the message "alloc_f did not return any work". +This needs to be resolved in the generator by generating extra points +as needed (exluding from until then). +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true +# TESTSUITE_EXCLUDE: true + +import numpy as np +from gest_api.vocs import VOCS + +from optimas.core import Task +from optimas.generators import AxMultitaskGenerator + +from libensemble import Ensemble +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + +def eval_func_multitask(input_params): + """Evaluation function for task1 or task2 in multitask test""" + print(f'input_params: {input_params}') + x0 = input_params["x0"] + x1 = input_params["x1"] + trial_type = input_params["trial_type"] + + if trial_type == "task_1": + result = -(x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) + else: + result = -0.5 * (x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) + + output_params = {"f": result} + return output_params + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + n = 2 + batch_size = 2 + + libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + + vocs = VOCS( + variables={ + "x0": [-50.0, 5.0], + "x1": [-5.0, 15.0], + "trial_type": {"task_1", "task_2"}, + }, + objectives={"f": "MAXIMIZE"}, + ) + + sim_specs = SimSpecs( + simulator=eval_func_multitask, + vocs=vocs, + ) + + alloc_specs = AllocSpecs(alloc_f=alloc_f) + exit_criteria = ExitCriteria(sim_max=15) + + H0 = None # or np.load("multitask_first_pass.npy") + for run_num in range(2): + print(f"\nRun number: {run_num}") + task1 = Task("task_1", n_init=2, n_opt=1) + task2 = Task("task_2", n_init=5, n_opt=3) + gen = AxMultitaskGenerator(vocs=vocs, hifi_task=task1, lofi_task=task2) + + gen_specs = GenSpecs( + generator=gen, + batch_size=batch_size, + vocs=vocs, + ) + + workflow = Ensemble( + libE_specs=libE_specs, + sim_specs=sim_specs, + alloc_specs=alloc_specs, + gen_specs=gen_specs, + exit_criteria=exit_criteria, + H0=H0, + ) + + H, _, _ = workflow.run() + + if run_num == 0: + H0 = H + workflow.save_output("multitask_first_pass", append_attrs=False) # Allows restart only run + + if workflow.is_manager: + if run_num == 1: + workflow.save_output("multitask_with_H0") + print(f"Second run completed: {len(H)} simulations") diff --git a/libensemble/tests/regression_tests/test_optimas_ax_sf.py b/libensemble/tests/regression_tests/test_optimas_ax_sf.py new file mode 100644 index 000000000..ba0b66c29 --- /dev/null +++ b/libensemble/tests/regression_tests/test_optimas_ax_sf.py @@ -0,0 +1,83 @@ +""" +Tests libEnsemble with Optimas Single-Fidelity Ax Generator + +*****currently fixing nworkers to batch_size***** + +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_optimas_ax_sf.py + python test_optimas_ax_sf.py -n 4 + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 4 as the generator is on the manager. + +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true + +import numpy as np + +from gest_api.vocs import VOCS +from optimas.generators import AxSingleFidelityGenerator + +from libensemble import Ensemble +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + +def eval_func_sf(input_params): + """Evaluation function for single-fidelity test. """ + x0 = input_params["x0"] + x1 = input_params["x1"] + result = -(x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) + return {"f": result} + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + n = 2 + batch_size = 2 + + libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + + vocs = VOCS( + variables={ + "x0": [-50.0, 5.0], + "x1": [-5.0, 15.0], + }, + objectives={"f": "MAXIMIZE"}, + ) + + gen = AxSingleFidelityGenerator(vocs=vocs) + + gen_specs = GenSpecs( + generator=gen, + batch_size=batch_size, + vocs=vocs, + ) + + sim_specs = SimSpecs( + simulator=eval_func_sf, + vocs=vocs, + ) + + alloc_specs = AllocSpecs(alloc_f=alloc_f) + exit_criteria = ExitCriteria(sim_max=10) + + workflow = Ensemble( + libE_specs=libE_specs, + sim_specs=sim_specs, + alloc_specs=alloc_specs, + gen_specs=gen_specs, + exit_criteria=exit_criteria, + ) + + H, _, _ = workflow.run() + + # Perform the run + if workflow.is_manager: + workflow.save_output(__file__) + print(f"Completed {len(H)} simulations") diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index 1c10a9d88..19b67f37e 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -77,10 +77,8 @@ def _get_new_dtype_fields(first: dict, mapping: dict = {}) -> list: new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] + list( mapping.keys() ) # array dtype needs "x". avoid fields from mapping values since we're converting those to "x" - # We need to accommodate "_id" getting mapped to "sim_id", but if it's not present # in the input dictionary, then perhaps we're doing an initial sample. - # I wonder if this loop is generalizable to other fields. if "_id" not in first and "sim_id" in mapping: new_dtype_names.remove("sim_id") return new_dtype_names @@ -97,7 +95,7 @@ def _decide_dtype(name: str, entry, size: int) -> tuple: output_type = "U" + str(len(entry) + 1) else: output_type = type(entry) # use default "python" type - if name == "sim_id": # mapping seems to assume that sim_ids are interpretable as floats unless this...? + if name == "sim_id": output_type = int if size == 1 or not size: return (name, output_type) @@ -124,12 +122,16 @@ def _pack_field(input_dict: dict, field_names: list) -> tuple: def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) -> npt.NDArray: + """Convert list of dicts to numpy structured array""" if list_dicts is None: return None - if not isinstance(list_dicts, list): # presumably already a numpy array, conversion not necessary + if not isinstance(list_dicts, list): return list_dicts + if not list_dicts: + return np.array([], dtype=dtype if dtype else []) + # first entry is used to determine dtype first = list_dicts[0] @@ -139,7 +141,7 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - if ( dtype is None - ): # rather roundabout. I believe default value gets set upon function instantiation. (default is mutable!) + ): # Default value gets set upon function instantiation (default is mutable). dtype = [] # build dtype of non-mapped fields. appending onto empty dtype @@ -148,9 +150,11 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - # append dtype of mapped float fields if len(mapping): + existing_names = [f[0] for f in dtype] for name in mapping: - size = len(mapping[name]) - dtype.append(_decide_dtype(name, 0.0, size)) # float + if name not in existing_names: + size = len(mapping[name]) + dtype.append(_decide_dtype(name, 0.0, size)) # default to float out = np.zeros(len(list_dicts), dtype=dtype) @@ -161,6 +165,7 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - out[output_name][j] = _pack_field(input_dict, input_names) else: out[output_name][j] = _pack_field(input_dict, mapping[output_name]) + return out @@ -215,6 +220,7 @@ def unmap_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray: def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}) -> List[dict]: + """Convert numpy structured array to list of dicts""" if array is None: return None out = [] @@ -240,9 +246,9 @@ def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}) -> List[dict]: out.append(new_dict) - # exiting gen: convert sim_id to _id + # Remove _id from entries where it's -1 (unset) for entry in out: - if "sim_id" in entry: - entry["_id"] = entry.pop("sim_id") + if entry.get("_id") == -1: + entry.pop("_id") return out diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index b0c78a7bc..0d96b099b 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -121,6 +121,9 @@ def _get_points_updates(self, batch_size: int) -> (npt.NDArray, npt.NDArray): def _convert_ingest(self, x: npt.NDArray) -> list: self.gen.ingest(np_to_list_dicts(x)) + def _convert_initial_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest(np_to_list_dicts(x, mapping=getattr(self.gen, "variables_mapping", {}))) + def _loop_over_gen(self, tag, Work, H_in): """Interact with suggest/ingest generator that *does not* contain a background thread""" while tag not in [PERSIS_STOP, STOP_TAG]: @@ -139,12 +142,17 @@ def _get_initial_suggest(self, libE_info) -> npt.NDArray: def _start_generator_loop(self, tag, Work, H_in): """Start the generator loop after choosing best way of giving initial results to gen""" - self.gen.ingest(np_to_list_dicts(H_in, mapping=getattr(self.gen, "variables_mapping", {}))) + self._convert_initial_ingest(H_in) return self._loop_over_gen(tag, Work, H_in) def _persistent_result(self, calc_in, persis_info, libE_info): """Setup comms with manager, setup gen, loop gen to completion, return gen's results""" self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG) + + # If H0 exists, ingest it into the generator before initial suggest + if calc_in is not None and len(calc_in) > 0: + self._convert_initial_ingest(calc_in) + # libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array H_out = list_dicts_to_np( self._get_initial_suggest(libE_info), @@ -182,10 +190,8 @@ def _get_points_updates(self, batch_size: int) -> (npt.NDArray, list): def _convert_ingest(self, x: npt.NDArray) -> list: self.gen.ingest_numpy(x) - def _start_generator_loop(self, tag, Work, H_in) -> npt.NDArray: - """Start the generator loop after choosing best way of giving initial results to gen""" - self.gen.ingest_numpy(H_in) - return self._loop_over_gen(tag, Work, H_in) # see parent class + def _convert_initial_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest_numpy(x) class LibensembleGenThreadRunner(StandardGenRunner):