hpcflow.sdk.persistence package#

Submodules#

hpcflow.sdk.persistence.base module#

class hpcflow.sdk.persistence.base.PersistentStore(workflow)#

Bases: ABC

Parameters:

workflow (Workflow) –

add_EARs(task_idx, task_insert_ID, element_iter_idx, EARs, param_src_updates)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • element_iter_idx (int) –

  • EARs (Dict) –

  • param_src_updates (Dict) –

Return type:

None

add_element_iterations(task_idx, task_insert_ID, element_iterations, element_iters_idx)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • element_iterations (List[Dict]) –

  • element_iters_idx (Dict[int, List[int]]) –

Return type:

None

add_element_set(task_idx, element_set_js)#
Parameters:
  • task_idx (int) –

  • element_set_js (Dict) –

Return type:

None

add_elements(task_idx, task_insert_ID, elements, element_iterations)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • elements (List[Dict]) –

  • element_iterations (List[Dict]) –

Return type:

None

add_empty_task(task_idx, task_js)#
Parameters:
  • task_idx (int) –

  • task_js (Dict) –

Return type:

None

add_loop(task_indices, loop_js, iterable_parameters)#

Initialise the zeroth iterations of a named loop across the specified task subset.

Parameters:
  • task_indices (List[int]) – List of task indices that identifies the task subset over which the new loop should iterate.

  • loop_js (Dict) –

  • iterable_parameters (Dict[str:Dict]) –

Return type:

None

add_parameter_data(data, source)#
Parameters:
  • data (Any) –

  • source (Dict) –

Return type:

int

add_submission(submission_js)#

Add a new submission to the workflow.

Parameters:

submission_js (Dict) –

add_template_components(template_components)#
Parameters:

template_components (Dict) –

Return type:

None

add_unset_parameter_data(source)#
Parameters:

source (Dict) –

Return type:

int

append_submission_attempt(sub_idx, submitted_js_idx)#
Parameters:

submitted_js_idx (int) –

Return type:

None

cached_load()#

Override this if a more performant implementation, is possible.

For example, in a JSON persistent store, we need to load the whole document from disk to read anything from it, so we can temporarily cache the document if we know we will be making multiple reads.

Return type:

Iterator[None]

abstract check_parameters_exist(indices)#
Parameters:

indices (int | List[int]) –

Return type:

bool | List[bool]

clear_pending()#
Return type:

None

abstract commit_pending()#
Return type:

None

abstract copy(path=None)#

Make a copy of the store.

Parameters:

path (PathLike | None) –

Return type:

None

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

abstract exists()#
Return type:

bool

property features#
abstract get_all_parameter_data()#
Return type:

Dict[int, Any]

abstract get_all_tasks_metadata()#
Return type:

List[Dict]

get_creation_info()#

Get information about the app that created the workflow.

abstract get_loops()#
Return type:

List[Dict]

abstract get_num_added_tasks()#

Get the total number of tasks ever added to the workflow, regardless of whether any of those tasks were subsequently removed from the workflow.

Return type:

int

abstract get_parameter_data(index)#
Parameters:

index (int) –

Return type:

Tuple[bool, Any]

abstract get_parameter_source(index)#
Parameters:

index (int) –

Return type:

Dict

abstract get_submissions()#
Return type:

List[Dict]

abstract get_task_elements(task_idx, task_insert_ID, selection)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • selection (slice) –

Return type:

List

get_task_elements_islice(task_idx, task_insert_ID, selection)#

Override this for a more performant implementation.

Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • selection (int | slice) –

Return type:

Iterator[Dict]

abstract get_template()#
Return type:

Dict

get_template_components()#

Get all template components, including pending.

Return type:

Dict

property has_pending: bool#

Returns True if there are pending changes that are not yet committed.

abstract is_modified_on_disk()#

Check if the workflow (metadata) has been modified on disk since initial load (this is bad).

Return type:

bool

abstract is_parameter_set(index)#
Parameters:

index (int) –

Return type:

bool

abstract classmethod path_has_store(path)#

Is a given workflow path of this store type?

abstract reinstate_replaced_dir()#
Return type:

None

reject_pending()#
Return type:

None

abstract remove_replaced_dir()#
Return type:

None

save()#
Return type:

None

set_EAR_end(task_insert_ID, element_iteration_idx, action_idx, run_idx)#
Parameters:
  • task_insert_ID (int) –

  • element_iteration_idx (int) –

  • action_idx (int) –

  • run_idx (int) –

Return type:

None

set_EAR_start(task_insert_ID, element_iteration_idx, action_idx, run_idx)#
Parameters:
  • task_insert_ID (int) –

  • element_iteration_idx (int) –

  • action_idx (int) –

  • run_idx (int) –

Return type:

None

set_EAR_submission_indices(sub_idx, EAR_indices)#
Parameters:
  • sub_idx (int) –

  • EAR_indices (Tuple[int, int, int, int]) –

Return type:

None

set_jobscript_job_ID(sub_idx, js_idx, job_ID)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • job_ID (int) –

Return type:

None

set_jobscript_submit_time(sub_idx, js_idx, submit_time)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • submit_time (datetime) –

Return type:

None

set_jobscript_version_info(sub_idx, js_idx, vers_info)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • vers_info (Tuple) –

Return type:

None

abstract set_parameter(index, data)#

Set the value of a pre-allocated parameter.

Parameters:
  • index (int) –

  • data (Any) –

Return type:

None

property store_name#
abstract property store_path#

Get the store path, which may be the same as the workflow path.

property ts_fmt: str#
update_loop_num_added_iters(loop_idx, num_added_iters)#
Parameters:
  • loop_idx (int) –

  • num_added_iters (int) –

property workflow: Workflow#
property workflow_path: Path#
abstract classmethod write_empty_workflow(template_js, template_components_js, workflow_path, replaced_dir, creation_info)#
Parameters:
  • template_js (Dict) –

  • template_components_js (Dict) –

  • workflow_path (Path) –

  • replaced_dir (Path) –

  • creation_info (Dict) –

Return type:

None

class hpcflow.sdk.persistence.base.PersistentStoreFeatures(jobscript_parallelism=False, EAR_parallelism=False, schedulers=False, submission=False)#

Bases: object

Class to represent the features provided by a persistent store.

Parameters:
  • jobscript_parallelism (bool) – If True, the store supports workflows running multiple independent jobscripts simultaneously.

  • EAR_parallelism (bool) – If True, the store supports workflows running multiple EARs simultaneously.

  • schedulers (bool) – If True, the store supports submitting workflows to a scheduler

  • submission (bool) – If True, the store supports submission. If False, the store can be considered to be an archive, which would need transforming to another store type before submission.

EAR_parallelism: bool = False#
jobscript_parallelism: bool = False#
schedulers: bool = False#
submission: bool = False#
hpcflow.sdk.persistence.base.dropbox_retry_fail(err)#
Parameters:

err (Exception) –

Return type:

None

hpcflow.sdk.persistence.base.remove_dir(dir_path)#

Try very hard to delete a directory.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

dir_path (Path) –

Return type:

None

hpcflow.sdk.persistence.base.rename_dir(replaced_dir, original_dir)#
Return type:

None

hpcflow.sdk.persistence.json module#

class hpcflow.sdk.persistence.json.JSONPersistentStore(workflow)#

Bases: PersistentStore

A verbose but inefficient storage backend, to help with understanding and debugging.

Notes

We split the data across three JSON files to support submission to schedulers. During scheduler submission, if a task is quick, parameter data might be written at the same time as both submission metadata (jobscript submission time), and EAR metadata (EAR start/end time).

Parameters:

workflow (Workflow) –

cached_load()#

Context manager to cache the whole JSON document, allowing for multiple read operations with one disk read.

Return type:

Iterator[Dict]

check_parameters_exist(indices)#
Parameters:

indices (int | List[int]) –

Return type:

bool | List[bool]

commit_pending()#
Return type:

None

copy(path=None)#

Make a copy of the store.

Parameters:

path (PathLike | None) –

Return type:

None

exists()#
Return type:

bool

get_all_parameter_data()#
Return type:

Dict[int, Any]

get_all_tasks_metadata()#
Return type:

List[Dict]

get_loops()#
Return type:

List[Dict]

get_num_added_tasks()#

Get the total number of tasks ever added to the workflow, regardless of whether any of those tasks were subsequently removed from the workflow.

Return type:

int

get_parameter_data(index)#
Parameters:

index (int) –

Return type:

Tuple[bool, Any]

get_parameter_source(index)#
Parameters:

index (int) –

Return type:

Dict

get_submissions()#
Return type:

List[Dict]

get_task_elements(task_idx, task_insert_ID, selection, keep_iterations_idx=False)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • selection (slice) –

  • keep_iterations_idx (bool) –

Return type:

List[Dict]

get_task_idx_from_insert_ID(insert_ID)#
get_template()#
Return type:

Dict

is_modified_on_disk()#

Check if the workflow (metadata) has been modified on disk since initial load (this is bad).

Return type:

bool | Dict

is_parameter_set(index)#
Parameters:

index (int) –

Return type:

bool

load()#
Return type:

Dict

load_metadata()#
Return type:

Dict

load_parameter_data()#
Return type:

Dict

load_submissions()#
Return type:

Dict

classmethod path_has_store(path)#

Is a given workflow path of this store type?

reinstate_replaced_dir()#
Return type:

None

remove_replaced_dir()#
Return type:

None

set_parameter(index, data)#

Set the value of a pre-allocated parameter.

Parameters:
  • index (int) –

  • data (Any) –

Return type:

None

property store_path#

Get the store path, which may be the same as the workflow path.

classmethod write_empty_workflow(template_js, template_components_js, workflow_path, replaced_dir, creation_info)#
Parameters:
  • template_js (Dict) –

  • template_components_js (Dict) –

  • workflow_path (Path) –

  • replaced_dir (Path) –

  • creation_info (Dict) –

Return type:

None

hpcflow.sdk.persistence.zarr module#

class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(workflow)#

Bases: PersistentStore

An efficient storage backend using Zarr that supports parameter-setting from multiple processes.

Parameters:

workflow (Workflow) –

add_EARs(task_idx, task_insert_ID, element_iter_idx, EARs, param_src_updates)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • element_iter_idx (int) –

  • EARs (Dict) –

  • param_src_updates (Dict) –

Return type:

None

add_element_iterations(task_idx, task_insert_ID, element_iterations, element_iters_idx)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • element_iterations (List[Dict]) –

  • element_iters_idx (Dict[int, List[int]]) –

Return type:

None

add_elements(task_idx, task_insert_ID, elements, element_iterations)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • elements (List[Dict]) –

  • element_iterations (List[Dict]) –

Return type:

None

cached_load()#

Context manager to cache the root attributes (i.e. metadata).

Return type:

Iterator[Dict]

check_parameters_exist(indices)#
Parameters:

indices (int | List[int]) –

Return type:

bool | List[bool]

commit_pending()#
Return type:

None

copy(path=None)#

Make a copy of the store.

Parameters:

path (PathLike | None) –

Return type:

None

exists()#
Return type:

bool

get_all_parameter_data()#
Return type:

Dict[int, Any]

get_all_tasks_metadata()#
Return type:

List[Dict]

get_loops()#
Return type:

List[Dict]

get_num_added_tasks()#

Get the total number of tasks ever added to the workflow, regardless of whether any of those tasks were subsequently removed from the workflow.

Return type:

int

get_parameter_data(index)#
Parameters:

index (int) –

Return type:

Tuple[bool, Any]

get_parameter_source(index)#
Parameters:

index (int) –

Return type:

Dict

get_submissions()#
Return type:

List[Dict]

get_task_elements(task_idx, task_insert_ID, selection, keep_iterations_idx=False)#
Parameters:
  • task_idx (int) –

  • task_insert_ID (int) –

  • selection (slice) –

  • keep_iterations_idx (bool) –

Return type:

List

get_template()#
Return type:

Dict

property has_pending: bool#

Returns True if there are pending changes that are not yet committed.

is_modified_on_disk()#

Check if the workflow (metadata) has been modified on disk since initial load (this is bad).

Return type:

bool

is_parameter_set(index)#
Parameters:

index (int) –

Return type:

bool

load_metadata()#
classmethod path_has_store(path)#

Is a given workflow path of this store type?

reinstate_replaced_dir()#
Return type:

None

reject_pending()#
Return type:

None

remove_replaced_dir()#
Return type:

None

set_parameter(index, data)#

Set the value of a pre-allocated parameter.

Parameters:
  • index (int) –

  • data (Any) –

Return type:

None

property store_path#

Get the store path, which may be the same as the workflow path.

classmethod write_empty_workflow(template_js, template_components_js, workflow_path, replaced_dir, creation_info)#
Parameters:
  • template_js (Dict) –

  • template_components_js (Dict) –

  • workflow_path (Path) –

  • replaced_dir (Path) –

  • creation_info (Dict) –

Return type:

None

Module contents#

hpcflow.sdk.persistence.store_cls_from_path(workflow_path)#
Parameters:

workflow_path (Path) –

Return type:

Type[PersistentStore]

hpcflow.sdk.persistence.store_cls_from_str(store_format)#
Parameters:

store_format (str) –

Return type:

Type[PersistentStore]

hpcflow.sdk.persistence.temporary_workflow_rename(path)#

Rename an existing same-path workflow directory so we can restore it if workflow creation fails