hpcflow.app.Workflow#
- class hpcflow.app.Workflow(workflow_ref, store_fmt=None, fs_kwargs=None, **kwargs)#
Bases:
Workflow
A concrete workflow.
- Parameters:
workflow_ref (str | Path | int) – Either the path to a persistent workflow, or an integer that will interpreted as the local ID of a workflow submission, as reported by the app show command.
store_fmt (str | None) – The format of persistent store to use. Used to select the store manager class.
fs_kwargs (dict[str, Any] | None) – Additional arguments to pass when resolving a virtual workflow reference.
kwargs – For compatibility during pre-stable development phase.
Methods
Abort the currently running action-run of the specified task/element.
Add a loop to a subset of workflow tasks.
Add a job submission to this workflow.
Add a task to this workflow.
Add a new task after the specified task.
Add a new task before the specified task.
A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.
Cancel any running jobscripts.
Check if a loop should terminate, given the specified completed run, and if so, set downstream iteration runs to be skipped.
Check if all the parameters exist.
Copy the workflow to a new path and return the copied workflow path.
Delete the persistent data.
Get the elements of the workflow's tasks.
Generate from a JSON file.
Generate from a JSON string.
Generate from a YAML file.
Generate from a YAML string.
Generate from either a YAML or JSON file, depending on the file extension.
Generate from a WorkflowTemplate object.
Generate from the data associated with a WorkflowTemplate object.
Get EAR IDs belonging to multiple tasks.
Check if an EAR is to be skipped.
Get element action run objects from a list of IDs.
Get EARs belonging to multiple tasks.
Get all runs in the workflow.
Get all iterations in the workflow.
Get all elements in the workflow.
Retrieve all workflow parameter data.
Retrieve all persistent parameters sources.
Retrieve all persistent parameters.
Get the run IDs of all submissions.
Get the element IDs of EARs.
Get the element iteration IDs of EARs.
Return element iteration objects from a list of IDs.
Get element iterations belonging to multiple tasks.
Return element objects from a list of IDs.
Retrieve the run IDs of those runs that correspond to the final action within a named loop iteration.
Get the iteration task pathway.
Get a description of what is going on with looping.
Get a single parameter.
Get the data relating to a parameter.
Get whether some parameters are set.
Get the source of a particular parameter.
Get parameter sources known to the workflow.
Get parameters known to the workflow.
Retrieve elements that are running according to the scheduler.
Retrieve runs that are running according to the scheduler.
Get the persistent element action runs.
Get the persistent element iterations.
Get the persistent elements.
Get the persistent tasks.
Get the task IDs of elements.
Get the elements of a task.
Return the unique names of all workflow tasks.
Test if a particular parameter is set.
Process the shell stdout/stderr stream according to the associated Command object.
Rechunk metadata/runs and parameters/base arrays, making them more efficient.
Reorganise the stored data chunks for parameterss to be more efficient.
Reorganise the stored data chunks for EARs to be more efficient.
Reload the workflow from disk.
Resolve this workflow to a set of job scripts to run.
Save a parameter where an EAR can find it.
Set the end time and exit code on an EAR.
Record that an EAR is to be skipped due to an upstream failure or loop termination condition being met.
Set the start time on an EAR.
Set the submission index of an EAR.
Set
EARs_initialised
to True for the specified iteration.Set the value of a parameter.
Print a description of the status of every element action run in the workflow.
Submit the workflow for execution.
Rename an existing same-path workflow (directory) so we can restore it if workflow creation fails.
Convert the workflow to an unzipped form.
Wait for the completion of specified/all submitted jobscripts.
Write run-time commands for a given EAR.
Convert the workflow to a zipped form.
Attributes
Path to artifacts of the workflow (temporary files, etc).
The creation descriptor for the workflow.
Path to working directory path for executing.
The ID of this workflow.
Path to input files for the workflow.
The loops in this workflow.
The name of the workflow.
The total number of element action runs.
The total number of added tasks.
The total number of element iterations.
The total number of elements.
The total number of loops.
The total number of job submissions.
The total number of tasks.
The format of the workflow's persistent store.
The job submissions done by this workflow.
Path to submission data for ths workflow.
Path to artifacts of tasks.
The tasks in this workflow.
The template that this workflow was made from.
The template components used for this workflow.
The timestamp format.
The timestamp format for names.
An fsspec URL for this workflow.
- abort_run(submission_idx=-1, task_idx=None, task_insert_ID=None, element_idx=None)#
Abort the currently running action-run of the specified task/element.
- add_loop(loop)#
Add a loop to a subset of workflow tasks.
- Parameters:
loop (Loop) –
- Return type:
None
- add_submission(tasks=None, JS_parallelism=None)#
Add a job submission to this workflow.
- Parameters:
- Return type:
Submission | None
- add_task(task, new_index=None)#
Add a task to this workflow.
- add_task_after(new_task, task_ref=None)#
Add a new task after the specified task.
- add_task_before(new_task, task_ref=None)#
Add a new task before the specified task.
- batch_update(is_workflow_creation=False)#
A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.
- Parameters:
is_workflow_creation (bool) –
- Return type:
Iterator[None]
- check_loop_termination(loop_name, run_ID)#
Check if a loop should terminate, given the specified completed run, and if so, set downstream iteration runs to be skipped.
- check_parameters_exist(id_lst)#
Check if all the parameters exist.
- copy(path='.')#
Copy the workflow to a new path and return the copied workflow path.
- property creation_info: CreationInfo#
The creation descriptor for the workflow.
- delete()#
Delete the persistent data.
- Return type:
None
- classmethod from_JSON_file(JSON_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None, status=None)#
Generate from a JSON file.
- Parameters:
JSON_path (PathLike) – The path to a workflow template in the JSON file format.
path (PathLike) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (dict[str, str] | None) – String variables to substitute in the file given by JSON_path.
status (Status | None) –
- Return type:
- classmethod from_JSON_string(JSON_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None, status=None)#
Generate from a JSON string.
- Parameters:
JSON_str (str) – The JSON string containing a workflow template parametrisation.
path (PathLike) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (dict[str, str] | None) – String variables to substitute in the string JSON_str.
status (Status | None) –
- Return type:
- classmethod from_YAML_file(YAML_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None)#
Generate from a YAML file.
- Parameters:
YAML_path (PathLike) – The path to a workflow template in the YAML file format.
path (PathLike) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (dict[str, str] | None) – String variables to substitute in the file given by YAML_path.
- Return type:
- classmethod from_YAML_string(YAML_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None)#
Generate from a YAML string.
- Parameters:
YAML_str (str) – The YAML string containing a workflow template parametrisation.
path (PathLike) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (dict[str, str] | None) – String variables to substitute in the string YAML_str.
- Return type:
- classmethod from_file(template_path, template_format=None, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None, status=None)#
Generate from either a YAML or JSON file, depending on the file extension.
- Parameters:
template_path (PathLike) – The path to a template file in YAML or JSON format, and with a “.yml”, “.yaml”, or “.json” extension.
template_format (Literal['json', 'yaml'] | None) – If specified, one of “json” or “yaml”. This forces parsing from a particular format regardless of the file extension.
path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (dict[str, str] | None) – String variables to substitute in the file given by template_path.
status (Status | None) –
- Return type:
- classmethod from_template(template, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, status=None)#
Generate from a WorkflowTemplate object.
- Parameters:
template (WorkflowTemplate) – The WorkflowTemplate object to make persistent.
path (PathLike) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
status (Status | None) –
- Return type:
- classmethod from_template_data(template_name, tasks=None, loops=None, resources=None, path=None, workflow_name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None)#
Generate from the data associated with a WorkflowTemplate object.
- Parameters:
template_name (str) – Name of the new workflow template, from which the new workflow will be generated.
tasks (list[Task] | None) – List of Task objects to add to the new workflow.
loops (list[Loop] | None) – List of Loop objects to add to the new workflow.
resources (Resources) – Mapping of action scopes to resource requirements, to be applied to all element sets in the workflow. resources specified in an element set take precedence of those defined here for the whole workflow.
path (PathLike | None) – The directory in which the workflow will be generated. The current directory if not specified.
workflow_name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified template_name will be used, in combination with a date-timestamp.
overwrite (bool) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (dict[str, Any] | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
- Return type:
- get_EAR_IDs_of_tasks(id_lst)#
Get EAR IDs belonging to multiple tasks.
- get_EAR_skipped(EAR_ID)#
Check if an EAR is to be skipped.
- get_EARs_from_IDs(ids)#
Get element action run objects from a list of IDs.
- Parameters:
- Return type:
- get_EARs_of_tasks(id_lst)#
Get EARs belonging to multiple tasks.
- Parameters:
id_lst (Iterable[int]) –
- Return type:
Iterator[ElementActionRun]
- get_all_EARs()#
Get all runs in the workflow.
- Return type:
- get_all_element_iterations()#
Get all iterations in the workflow.
- Return type:
- get_all_parameter_data(**kwargs)#
Retrieve all workflow parameter data.
- get_all_parameter_sources(**kwargs)#
Retrieve all persistent parameters sources.
- Return type:
- get_all_parameters(**kwargs)#
Retrieve all persistent parameters.
- Keyword Arguments:
dataset_copy (bool) – For Zarr stores only. If True, copy arrays as NumPy arrays.
- Return type:
- get_element_IDs_from_EAR_IDs(id_lst)#
Get the element IDs of EARs.
- get_element_iteration_IDs_from_EAR_IDs(id_lst)#
Get the element iteration IDs of EARs.
- get_element_iterations_from_IDs(id_lst)#
Return element iteration objects from a list of IDs.
- Parameters:
id_lst (Iterable[int]) –
- Return type:
- get_element_iterations_of_tasks(id_lst)#
Get element iterations belonging to multiple tasks.
- Parameters:
id_lst (Iterable[int]) –
- Return type:
Iterator[ElementIteration]
- get_elements_from_IDs(id_lst)#
Return element objects from a list of IDs.
- get_iteration_final_run_IDs(id_lst=None)#
Retrieve the run IDs of those runs that correspond to the final action within a named loop iteration.
These runs represent the final action of a given element-iteration; this is used to identify which commands file to append a loop-termination check to.
- get_iteration_task_pathway(ret_iter_IDs=False, ret_data_idx=False)#
Get the iteration task pathway.
- get_loop_map(id_lst=None)#
Get a description of what is going on with looping.
- get_parameter(index, **kwargs)#
Get a single parameter.
Parameter#
- index:
The index of the parameter to retrieve.
- keyword dataset_copy:
For Zarr stores only. If True, copy arrays as NumPy arrays.
- kwtype dataset_copy:
bool
- Parameters:
index (int) –
- Return type:
- get_parameter_data(index, **kwargs)#
Get the data relating to a parameter.
- Parameters:
index (int) –
- Return type:
Any
- get_parameter_set_statuses(id_lst)#
Get whether some parameters are set.
- get_parameter_source(index)#
Get the source of a particular parameter.
- Parameters:
index (int) –
- Return type:
- get_parameter_sources(id_lst)#
Get parameter sources known to the workflow.
- Parameters:
id_lst (Iterable[int]) –
- Return type:
- get_parameters(id_lst, **kwargs)#
Get parameters known to the workflow.
Parameter#
- id_lst:
The indices of the parameters to retrieve.
- keyword dataset_copy:
For Zarr stores only. If True, copy arrays as NumPy arrays.
- kwtype dataset_copy:
bool
- Parameters:
id_lst (Iterable[int]) –
- Return type:
Sequence[StoreParameter]
- get_running_elements(submission_idx=-1, task_idx=None, task_insert_ID=None)#
Retrieve elements that are running according to the scheduler.
- get_running_runs(submission_idx=-1, task_idx=None, task_insert_ID=None, element_idx=None)#
Retrieve runs that are running according to the scheduler.
- get_store_EARs(id_lst)#
Get the persistent element action runs.
- get_store_element_iterations(id_lst)#
Get the persistent element iterations.
- Parameters:
id_lst (Iterable[int]) –
- Return type:
Sequence[StoreElementIter]
- get_store_elements(id_lst)#
Get the persistent elements.
- Parameters:
id_lst (Iterable[int]) –
- Return type:
Sequence[StoreElement]
- get_store_tasks(id_lst)#
Get the persistent tasks.
- get_task_IDs_from_element_IDs(id_lst)#
Get the task IDs of elements.
- get_task_elements(task, idx_lst=None)#
Get the elements of a task.
- Parameters:
task (WorkflowTask) –
- Return type:
- get_task_unique_names(map_to_insert_ID=False)#
Return the unique names of all workflow tasks.
- is_parameter_set(index)#
Test if a particular parameter is set.
- property loops: WorkflowLoopList#
The loops in this workflow.
- property name: str#
The name of the workflow.
The workflow name may be different from the template name, as it includes the creation date-timestamp if generated.
- process_shell_parameter_output(name, value, EAR_ID, cmd_idx, stderr=False)#
Process the shell stdout/stderr stream according to the associated Command object.
- rechunk(chunk_size=None, backup=True, status=True)#
Rechunk metadata/runs and parameters/base arrays, making them more efficient.
- rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
Reorganise the stored data chunks for parameterss to be more efficient.
- rechunk_runs(chunk_size=None, backup=True, status=True)#
Reorganise the stored data chunks for EARs to be more efficient.
- reload()#
Reload the workflow from disk.
- Return type:
Self
- resolve_jobscripts(tasks=None)#
Resolve this workflow to a set of job scripts to run.
- save_parameter(name, value, EAR_ID)#
Save a parameter where an EAR can find it.
- set_EAR_end(js_idx, js_act_idx, EAR_ID, exit_code)#
Set the end time and exit code on an EAR.
If the exit code is non-zero, also set all downstream dependent EARs to be skipped. Also save any generated input/output files.
- set_EAR_skip(EAR_ID)#
Record that an EAR is to be skipped due to an upstream failure or loop termination condition being met.
- Parameters:
EAR_ID (int) –
- Return type:
None
- set_EAR_submission_index(EAR_ID, sub_idx)#
Set the submission index of an EAR.
- set_EARs_initialised(iter_ID)#
Set
EARs_initialised
to True for the specified iteration.- Parameters:
iter_ID (int) –
- Return type:
None
- set_parameter_value(param_id, value, commit=False)#
Set the value of a parameter.
- show_all_EAR_statuses()#
Print a description of the status of every element action run in the workflow.
- Return type:
None
- property submissions: list[Submission]#
The job submissions done by this workflow.
- submit(*, ignore_errors=False, JS_parallelism=None, print_stdout=False, wait=False, add_to_known=True, return_idx=False, tasks=None, cancel=False, status=True)#
Submit the workflow for execution.
- Parameters:
ignore_errors (bool) – If True, ignore jobscript submission errors. If False (the default) jobscript submission will halt when a jobscript fails to submit.
JS_parallelism (bool | None) – If True, allow multiple jobscripts to execute simultaneously. Raises if set to True but the store type does not support the jobscript_parallelism feature. If not set, jobscript parallelism will be used if the store type supports it.
print_stdout (bool) – If True, print any jobscript submission standard output, otherwise hide it.
wait (bool) – If True, this command will block until the workflow execution is complete.
add_to_known (bool) – If True, add the submitted submissions to the known-submissions file, which is used by the show command to monitor current and recent submissions.
return_idx (bool) – If True, return a dict representing the jobscript indices submitted for each submission.
tasks (list[int] | None) – List of task indices to include in the new submission if no submissions already exist. By default all tasks are included if a new submission is created.
cancel (bool) – Immediately cancel the submission. Useful for testing and benchmarking.
status (bool) – If True, display a live status to track submission progress.
- Return type:
- property tasks: WorkflowTaskList#
The tasks in this workflow.
- property template: WorkflowTemplate#
The template that this workflow was made from.
- property template_components: TemplateComponents#
The template components used for this workflow.
- classmethod temporary_rename(path, fs)#
Rename an existing same-path workflow (directory) so we can restore it if workflow creation fails.
Renaming will occur until the successfully completed. This means multiple new paths may be created, where only the final path should be considered the successfully renamed workflow. Other paths will be deleted.
- Parameters:
path (str) –
fs (AbstractFileSystem) –
- Return type:
- unzip(path='.', *, log=None)#
Convert the workflow to an unzipped form.
- wait(sub_js=None)#
Wait for the completion of specified/all submitted jobscripts.
- write_commands(submission_idx, jobscript_idx, JS_action_idx, EAR_ID)#
Write run-time commands for a given EAR.
- zip(path='.', *, log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#
Convert the workflow to a zipped form.
- Parameters:
path (str) – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.
log (str | None) –
overwrite (bool) –
include_execute (bool) –
include_rechunk_backups (bool) –
- Return type: