hpcflow.app.Workflow#

class hpcflow.app.Workflow(path, store_fmt=None, fs_kwargs=None)#

Bases: Workflow

Methods

add_loop

Add a loop to a subset of workflow tasks.

add_submission

add_task

add_task_after

Add a new task after the specified task.

add_task_before

Add a new task before the specified task.

batch_update

A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.

cancel

Cancel any running jobscripts.

check_parameters_exist

copy

Copy the workflow to a new path and return the copied workflow path.

delete

elements

from_JSON_file

Generate from a JSON file.

from_JSON_string

Generate from a JSON string.

from_YAML_file

Generate from a YAML file.

from_YAML_string

Generate from a YAML string.

from_file

Generate from either a YAML or JSON file, depending on the file extension.

from_template

Generate from a WorkflowTemplate object.

from_template_data

Generate from the data associated with a WorkflowTemplate object.

get_EAR_IDs_of_tasks

Get EAR IDs belonging to multiple tasks

get_EAR_skipped

Check if an EAR is to be skipped.

get_EARs_from_IDs

Return element action run objects from a list of IDs.

get_EARs_of_tasks

Get EARs belonging to multiple tasks

get_all_EARs

get_all_element_iterations

get_all_elements

get_all_parameter_data

Retrieve all workflow parameter data.

get_all_parameters

Retrieve all store parameters.

get_element_IDs_from_EAR_IDs

get_element_iteration_IDs_from_EAR_IDs

get_element_iterations_from_IDs

Return element iteration objects from a list of IDs.

get_element_iterations_of_tasks

Get element iterations belonging to multiple tasks

get_elements_from_IDs

Return element objects from a list of IDs.

get_iteration_task_pathway

get_parameter

get_parameter_data

get_parameter_set_statuses

get_parameter_source

get_parameter_sources

get_parameters

get_store_EARs

get_store_element_iterations

get_store_elements

get_store_tasks

get_task_IDs_from_element_IDs

get_task_elements

get_task_unique_names

Return the unique names of all workflow tasks.

is_parameter_set

resolve_jobscripts

save_parameter

set_EAR_end

Set the end time and exit code on an EAR.

set_EAR_skip

Record that an EAR is to be skipped due to an upstream failure.

set_EAR_start

Set the start time on an EAR.

set_EAR_submission_index

Set the submission index of an EAR.

set_parameter_value

show_all_EAR_statuses

submit

temporary_rename

Rename an existing same-path workflow (directory) so we can restore it if workflow creation fails.

to_zip

wait

Wait for the completion of specified/all submitted jobscripts.

write_commands

Write run-time commands for a given EAR.

Attributes

app

artifacts_path

creation_info

execution_path

fs_path

id_

input_files_path

loops

name

The workflow name may be different from the template name, as it includes the creation date-timestamp if generated.

num_EARs

num_added_tasks

num_element_iterations

num_elements

num_loops

num_submissions

num_tasks

store_format

submissions

submissions_path

task_artifacts_path

tasks

template

template_components

ts_fmt

ts_name_fmt

Parameters:
  • path (Union[str, Path]) –

  • store_fmt (Optional[str]) –

  • fs_kwargs (Optional[Dict]) –

add_loop(loop, parent_loop_indices=None)#

Add a loop to a subset of workflow tasks.

Parameters:
  • loop (Loop) –

  • parent_loop_indices (Dict) –

Return type:

None

add_submission(JS_parallelism=None)#
Parameters:

JS_parallelism (bool | None) –

Return type:

Submission

add_task(task, new_index=None)#
Parameters:
  • task (Task) –

  • new_index (int | None) –

Return type:

None

add_task_after(new_task, task_ref=None)#

Add a new task after the specified task.

Parameters:
  • task_ref (Task) – If not given, the new task will be added at the end of the workflow.

  • new_task (Task) –

Return type:

None

add_task_before(new_task, task_ref=None)#

Add a new task before the specified task.

Parameters:
  • task_ref (Task) – If not given, the new task will be added at the beginning of the workflow.

  • new_task (Task) –

Return type:

None

app = BaseApp(name='hpcFlow', version='0.2.0a76')#
property artifacts_path#
batch_update(is_workflow_creation=False)#

A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.

Parameters:

is_workflow_creation (bool) –

Return type:

Iterator[None]

cancel(hard=False)#

Cancel any running jobscripts.

check_parameters_exist(id_lst)#
Parameters:

id_lst (int | List[int]) –

Return type:

bool | List[bool]

copy(path=None)#

Copy the workflow to a new path and return the copied workflow path.

Return type:

str

property creation_info#
delete()#
elements()#
Return type:

Iterator[Element]

property execution_path#
classmethod from_JSON_file(JSON_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a JSON file.

Parameters:
  • JSON_path (PathLike) – The path to a workflow template in the JSON file format.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_JSON_string(JSON_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a JSON string.

Parameters:
  • JSON_str (PathLike) – The JSON string containing a workflow template parametrisation.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_YAML_file(YAML_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a YAML file.

Parameters:
  • YAML_path (PathLike) – The path to a workflow template in the YAML file format.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_YAML_string(YAML_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a YAML string.

Parameters:
  • YAML_str (PathLike) – The YAML string containing a workflow template parametrisation.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_file(template_path, template_format=None, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from either a YAML or JSON file, depending on the file extension.

Parameters:
  • template_path (PathLike) – The path to a template file in YAML or JSON format, and with a “.yml”, “.yaml”, or “.json” extension.

  • template_format (str | None) – If specified, one of “json” or “yaml”. This forces parsing from a particular format regardless of the file extension.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_template(template, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a WorkflowTemplate object.

Parameters:
  • template (WorkflowTemplate) – The WorkflowTemplate object to make persistent.

  • path (PathLike | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_template_data(template_name, tasks=None, loops=None, resources=None, path=None, workflow_name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from the data associated with a WorkflowTemplate object.

Parameters:
  • template_name (str) – Name of the new workflow template, from which the new workflow will be generated.

  • tasks (List[Task] | None) – List of Task objects to add to the new workflow.

  • loops (List[Loop] | None) – List of Loop objects to add to the new workflow.

  • resources (Dict[str, Dict] | None) – Mapping of action scopes to resource requirements, to be applied to all element sets in the workflow. resources specified in an element set take precedence of those defined here for the whole workflow.

  • path (PathLike | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • workflow_name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified template_name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

property fs_path#
get_EAR_IDs_of_tasks(id_lst)#

Get EAR IDs belonging to multiple tasks

Parameters:

id_lst (int) –

Return type:

List[int]

get_EAR_skipped(EAR_ID)#

Check if an EAR is to be skipped.

Parameters:

EAR_ID (int) –

Return type:

None

get_EARs_from_IDs(id_lst)#

Return element action run objects from a list of IDs.

Parameters:

id_lst (Iterable[int]) –

Return type:

List[ElementActionRun]

get_EARs_of_tasks(id_lst)#

Get EARs belonging to multiple tasks

Parameters:

id_lst (Iterable[int]) –

Return type:

List[ElementActionRun]

get_all_EARs()#
Return type:

List[ElementActionRun]

get_all_element_iterations()#
Return type:

List[ElementIteration]

get_all_elements()#
Return type:

List[Element]

get_all_parameter_data(**kwargs)#

Retrieve all workflow parameter data.

Parameters:

kwargs (Dict) –

Return type:

Dict[int, Any]

get_all_parameters(**kwargs)#

Retrieve all store parameters.

Parameters:

kwargs (Dict) –

Return type:

List[AnySParameter]

get_element_IDs_from_EAR_IDs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[int]

get_element_iteration_IDs_from_EAR_IDs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[int]

get_element_iterations_from_IDs(id_lst)#

Return element iteration objects from a list of IDs.

Parameters:

id_lst (Iterable[int]) –

Return type:

List[ElementIteration]

get_element_iterations_of_tasks(id_lst)#

Get element iterations belonging to multiple tasks

Parameters:

id_lst (Iterable[int]) –

Return type:

List[ElementIteration]

get_elements_from_IDs(id_lst)#

Return element objects from a list of IDs.

Parameters:

id_lst (Iterable[int]) –

Return type:

List[Element]

get_iteration_task_pathway()#
get_parameter(index, **kwargs)#
Parameters:
  • index (int) –

  • kwargs (Dict) –

Return type:

AnySParameter

get_parameter_data(index, **kwargs)#
Parameters:
  • index (int) –

  • kwargs (Dict) –

Return type:

Any

get_parameter_set_statuses(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

get_parameter_source(index)#
Parameters:

index (int) –

Return type:

Dict

get_parameter_sources(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[Dict]

get_parameters(id_lst, **kwargs)#
Parameters:
  • id_lst (Iterable[int]) –

  • kwargs (Dict) –

Return type:

List[AnySParameter]

get_store_EARs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySEAR]

get_store_element_iterations(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElementIter]

get_store_elements(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElement]

get_store_tasks(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySTask]

get_task_IDs_from_element_IDs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[int]

get_task_elements(task, selection)#
Parameters:
Return type:

List[Element]

get_task_unique_names(map_to_insert_ID=False)#

Return the unique names of all workflow tasks.

Parameters:

map_to_insert_ID (bool, optional) – If True, return a dict whose values are task insert IDs, otherwise return a list.

Return type:

List[str] | Dict[str, int]

property id_#
property input_files_path#
is_parameter_set(index)#
Parameters:

index (int) –

Return type:

bool

property loops: WorkflowLoopList#
property name#

The workflow name may be different from the template name, as it includes the creation date-timestamp if generated.

property num_EARs#
property num_added_tasks: int#
property num_element_iterations#
property num_elements#
property num_loops: int#
property num_submissions#
property num_tasks#
resolve_jobscripts()#
Return type:

List[Jobscript]

save_parameter(name, value, EAR_ID)#
Parameters:
  • name (str) –

  • value (Any) –

  • EAR_ID (int) –

set_EAR_end(js_idx, js_act_idx, EAR_ID, exit_code)#

Set the end time and exit code on an EAR.

If the exit code is non-zero, also set all downstream dependent EARs to be skipped. Also save any generated input/output files.

Parameters:
  • js_idx (int) –

  • js_act_idx (int) –

  • EAR_ID (int) –

  • exit_code (int) –

Return type:

None

set_EAR_skip(EAR_ID)#

Record that an EAR is to be skipped due to an upstream failure.

Parameters:

EAR_ID (int) –

Return type:

None

set_EAR_start(EAR_ID)#

Set the start time on an EAR.

Parameters:

EAR_ID (int) –

Return type:

None

set_EAR_submission_index(EAR_ID, sub_idx)#

Set the submission index of an EAR.

Parameters:
  • EAR_ID (int) –

  • sub_idx (int) –

Return type:

None

set_parameter_value(param_id, value, commit=False)#
Parameters:
  • param_id (int) –

  • value (Any) –

  • commit (bool) –

Return type:

None

show_all_EAR_statuses()#
property store_format#
property submissions: List[Submission]#
property submissions_path#
submit(ignore_errors=False, JS_parallelism=None, print_stdout=False, wait=False)#
Parameters:
  • ignore_errors (bool | None) –

  • JS_parallelism (bool | None) –

  • print_stdout (bool | None) –

  • wait (bool) –

Return type:

Dict[int, int]

property task_artifacts_path#
property tasks: WorkflowTaskList#
property template: WorkflowTemplate#
property template_components: Dict#
classmethod temporary_rename(path, fs)#

Rename an existing same-path workflow (directory) so we can restore it if workflow creation fails.

Renaming will occur until the successfully completed. This means multiple new paths may be created, where only the final path should be considered the successfully renamed workflow. Other paths will be deleted.

Parameters:

path (str) –

Return type:

List[str]

to_zip(log=None)#
Return type:

str

property ts_fmt#
property ts_name_fmt#
wait(sub_js=None)#

Wait for the completion of specified/all submitted jobscripts.

Parameters:

sub_js (Dict | None) –

write_commands(submission_idx, jobscript_idx, JS_action_idx, EAR_ID)#

Write run-time commands for a given EAR.

Parameters:
  • submission_idx (int) –

  • jobscript_idx (int) –

  • JS_action_idx (int) –

  • EAR_ID (int) –

Return type:

None