hpcflow.sdk.persistence package#
Submodules#
hpcflow.sdk.persistence.base module#
- class hpcflow.sdk.persistence.base.PersistentStore(workflow)#
Bases:
ABC
- Parameters:
workflow (Workflow) –
- add_EARs(task_idx, task_insert_ID, element_iter_idx, EARs, param_src_updates)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
element_iter_idx (int) –
EARs (Dict) –
param_src_updates (Dict) –
- Return type:
None
- add_element_iterations(task_idx, task_insert_ID, element_iterations, element_iters_idx)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
element_iterations (List[Dict]) –
element_iters_idx (Dict[int, List[int]]) –
- Return type:
None
- add_element_set(task_idx, element_set_js)#
- Parameters:
task_idx (int) –
element_set_js (Dict) –
- Return type:
None
- add_elements(task_idx, task_insert_ID, elements, element_iterations)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
elements (List[Dict]) –
element_iterations (List[Dict]) –
- Return type:
None
- add_empty_task(task_idx, task_js)#
- Parameters:
task_idx (int) –
task_js (Dict) –
- Return type:
None
- add_loop(task_indices, loop_js, iterable_parameters)#
Initialise the zeroth iterations of a named loop across the specified task subset.
- Parameters:
task_indices (List[int]) – List of task indices that identifies the task subset over which the new loop should iterate.
loop_js (Dict) –
iterable_parameters (Dict[str:Dict]) –
- Return type:
None
- add_parameter_data(data, source)#
- Parameters:
data (Any) –
source (Dict) –
- Return type:
int
- add_submission(submission_js)#
Add a new submission to the workflow.
- Parameters:
submission_js (Dict) –
- add_template_components(template_components)#
- Parameters:
template_components (Dict) –
- Return type:
None
- add_unset_parameter_data(source)#
- Parameters:
source (Dict) –
- Return type:
int
- append_submission_attempt(sub_idx, submitted_js_idx)#
- Parameters:
submitted_js_idx (int) –
- Return type:
None
- cached_load()#
Override this if a more performant implementation, is possible.
For example, in a JSON persistent store, we need to load the whole document from disk to read anything from it, so we can temporarily cache the document if we know we will be making multiple reads.
- Return type:
Iterator[None]
- abstract check_parameters_exist(indices)#
- Parameters:
indices (int | List[int]) –
- Return type:
bool | List[bool]
- clear_pending()#
- Return type:
None
- abstract commit_pending()#
- Return type:
None
- abstract copy(path=None)#
Make a copy of the store.
- Parameters:
path (PathLike | None) –
- Return type:
None
- delete()#
Delete the persistent workflow.
- Return type:
None
- delete_no_confirm()#
Permanently delete the workflow data with no confirmation.
- Return type:
None
- abstract exists()#
- Return type:
bool
- property features#
- abstract get_all_parameter_data()#
- Return type:
Dict[int, Any]
- abstract get_all_tasks_metadata()#
- Return type:
List[Dict]
- get_creation_info()#
Get information about the app that created the workflow.
- abstract get_loops()#
- Return type:
List[Dict]
- abstract get_num_added_tasks()#
Get the total number of tasks ever added to the workflow, regardless of whether any of those tasks were subsequently removed from the workflow.
- Return type:
int
- abstract get_parameter_data(index)#
- Parameters:
index (int) –
- Return type:
Tuple[bool, Any]
- abstract get_parameter_source(index)#
- Parameters:
index (int) –
- Return type:
Dict
- abstract get_submissions()#
- Return type:
List[Dict]
- abstract get_task_elements(task_idx, task_insert_ID, selection)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
selection (slice) –
- Return type:
List
- get_task_elements_islice(task_idx, task_insert_ID, selection)#
Override this for a more performant implementation.
- Parameters:
task_idx (int) –
task_insert_ID (int) –
selection (int | slice) –
- Return type:
Iterator[Dict]
- abstract get_template()#
- Return type:
Dict
- get_template_components()#
Get all template components, including pending.
- Return type:
Dict
- property has_pending: bool#
Returns True if there are pending changes that are not yet committed.
- abstract is_modified_on_disk()#
Check if the workflow (metadata) has been modified on disk since initial load (this is bad).
- Return type:
bool
- abstract is_parameter_set(index)#
- Parameters:
index (int) –
- Return type:
bool
- abstract classmethod path_has_store(path)#
Is a given workflow path of this store type?
- abstract reinstate_replaced_dir()#
- Return type:
None
- reject_pending()#
- Return type:
None
- abstract remove_replaced_dir()#
- Return type:
None
- save()#
- Return type:
None
- set_EAR_end(task_insert_ID, element_iteration_idx, action_idx, run_idx)#
- Parameters:
task_insert_ID (int) –
element_iteration_idx (int) –
action_idx (int) –
run_idx (int) –
- Return type:
None
- set_EAR_start(task_insert_ID, element_iteration_idx, action_idx, run_idx)#
- Parameters:
task_insert_ID (int) –
element_iteration_idx (int) –
action_idx (int) –
run_idx (int) –
- Return type:
None
- set_EAR_submission_indices(sub_idx, EAR_indices)#
- Parameters:
sub_idx (int) –
EAR_indices (Tuple[int, int, int, int]) –
- Return type:
None
- set_jobscript_job_ID(sub_idx, js_idx, job_ID)#
- Parameters:
sub_idx (int) –
js_idx (int) –
job_ID (int) –
- Return type:
None
- set_jobscript_submit_time(sub_idx, js_idx, submit_time)#
- Parameters:
sub_idx (int) –
js_idx (int) –
submit_time (datetime) –
- Return type:
None
- set_jobscript_version_info(sub_idx, js_idx, vers_info)#
- Parameters:
sub_idx (int) –
js_idx (int) –
vers_info (Tuple) –
- Return type:
None
- abstract set_parameter(index, data)#
Set the value of a pre-allocated parameter.
- Parameters:
index (int) –
data (Any) –
- Return type:
None
- property store_name#
- abstract property store_path#
Get the store path, which may be the same as the workflow path.
- property ts_fmt: str#
- update_loop_num_added_iters(loop_idx, num_added_iters)#
- Parameters:
loop_idx (int) –
num_added_iters (int) –
- property workflow_path: Path#
- abstract classmethod write_empty_workflow(template_js, template_components_js, workflow_path, replaced_dir, creation_info)#
- Parameters:
template_js (Dict) –
template_components_js (Dict) –
workflow_path (Path) –
replaced_dir (Path) –
creation_info (Dict) –
- Return type:
None
- class hpcflow.sdk.persistence.base.PersistentStoreFeatures(jobscript_parallelism=False, EAR_parallelism=False, schedulers=False, submission=False)#
Bases:
object
Class to represent the features provided by a persistent store.
- Parameters:
jobscript_parallelism (bool) – If True, the store supports workflows running multiple independent jobscripts simultaneously.
EAR_parallelism (bool) – If True, the store supports workflows running multiple EARs simultaneously.
schedulers (bool) – If True, the store supports submitting workflows to a scheduler
submission (bool) – If True, the store supports submission. If False, the store can be considered to be an archive, which would need transforming to another store type before submission.
- EAR_parallelism: bool = False#
- jobscript_parallelism: bool = False#
- schedulers: bool = False#
- submission: bool = False#
- hpcflow.sdk.persistence.base.dropbox_retry_fail(err)#
- Parameters:
err (Exception) –
- Return type:
None
- hpcflow.sdk.persistence.base.remove_dir(dir_path)#
Try very hard to delete a directory.
Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.
- Parameters:
dir_path (Path) –
- Return type:
None
- hpcflow.sdk.persistence.base.rename_dir(replaced_dir, original_dir)#
- Return type:
None
hpcflow.sdk.persistence.json module#
- class hpcflow.sdk.persistence.json.JSONPersistentStore(workflow)#
Bases:
PersistentStore
A verbose but inefficient storage backend, to help with understanding and debugging.
Notes
We split the data across three JSON files to support submission to schedulers. During scheduler submission, if a task is quick, parameter data might be written at the same time as both submission metadata (jobscript submission time), and EAR metadata (EAR start/end time).
- Parameters:
workflow (Workflow) –
- cached_load()#
Context manager to cache the whole JSON document, allowing for multiple read operations with one disk read.
- Return type:
Iterator[Dict]
- check_parameters_exist(indices)#
- Parameters:
indices (int | List[int]) –
- Return type:
bool | List[bool]
- commit_pending()#
- Return type:
None
- copy(path=None)#
Make a copy of the store.
- Parameters:
path (PathLike | None) –
- Return type:
None
- exists()#
- Return type:
bool
- get_all_parameter_data()#
- Return type:
Dict[int, Any]
- get_all_tasks_metadata()#
- Return type:
List[Dict]
- get_loops()#
- Return type:
List[Dict]
- get_num_added_tasks()#
Get the total number of tasks ever added to the workflow, regardless of whether any of those tasks were subsequently removed from the workflow.
- Return type:
int
- get_parameter_data(index)#
- Parameters:
index (int) –
- Return type:
Tuple[bool, Any]
- get_parameter_source(index)#
- Parameters:
index (int) –
- Return type:
Dict
- get_submissions()#
- Return type:
List[Dict]
- get_task_elements(task_idx, task_insert_ID, selection, keep_iterations_idx=False)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
selection (slice) –
keep_iterations_idx (bool) –
- Return type:
List[Dict]
- get_task_idx_from_insert_ID(insert_ID)#
- get_template()#
- Return type:
Dict
- is_modified_on_disk()#
Check if the workflow (metadata) has been modified on disk since initial load (this is bad).
- Return type:
bool | Dict
- is_parameter_set(index)#
- Parameters:
index (int) –
- Return type:
bool
- load()#
- Return type:
Dict
- load_metadata()#
- Return type:
Dict
- load_parameter_data()#
- Return type:
Dict
- load_submissions()#
- Return type:
Dict
- classmethod path_has_store(path)#
Is a given workflow path of this store type?
- reinstate_replaced_dir()#
- Return type:
None
- remove_replaced_dir()#
- Return type:
None
- set_parameter(index, data)#
Set the value of a pre-allocated parameter.
- Parameters:
index (int) –
data (Any) –
- Return type:
None
- property store_path#
Get the store path, which may be the same as the workflow path.
- classmethod write_empty_workflow(template_js, template_components_js, workflow_path, replaced_dir, creation_info)#
- Parameters:
template_js (Dict) –
template_components_js (Dict) –
workflow_path (Path) –
replaced_dir (Path) –
creation_info (Dict) –
- Return type:
None
hpcflow.sdk.persistence.zarr module#
- class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(workflow)#
Bases:
PersistentStore
An efficient storage backend using Zarr that supports parameter-setting from multiple processes.
- Parameters:
workflow (Workflow) –
- add_EARs(task_idx, task_insert_ID, element_iter_idx, EARs, param_src_updates)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
element_iter_idx (int) –
EARs (Dict) –
param_src_updates (Dict) –
- Return type:
None
- add_element_iterations(task_idx, task_insert_ID, element_iterations, element_iters_idx)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
element_iterations (List[Dict]) –
element_iters_idx (Dict[int, List[int]]) –
- Return type:
None
- add_elements(task_idx, task_insert_ID, elements, element_iterations)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
elements (List[Dict]) –
element_iterations (List[Dict]) –
- Return type:
None
- cached_load()#
Context manager to cache the root attributes (i.e. metadata).
- Return type:
Iterator[Dict]
- check_parameters_exist(indices)#
- Parameters:
indices (int | List[int]) –
- Return type:
bool | List[bool]
- commit_pending()#
- Return type:
None
- copy(path=None)#
Make a copy of the store.
- Parameters:
path (PathLike | None) –
- Return type:
None
- exists()#
- Return type:
bool
- get_all_parameter_data()#
- Return type:
Dict[int, Any]
- get_all_tasks_metadata()#
- Return type:
List[Dict]
- get_loops()#
- Return type:
List[Dict]
- get_num_added_tasks()#
Get the total number of tasks ever added to the workflow, regardless of whether any of those tasks were subsequently removed from the workflow.
- Return type:
int
- get_parameter_data(index)#
- Parameters:
index (int) –
- Return type:
Tuple[bool, Any]
- get_parameter_source(index)#
- Parameters:
index (int) –
- Return type:
Dict
- get_submissions()#
- Return type:
List[Dict]
- get_task_elements(task_idx, task_insert_ID, selection, keep_iterations_idx=False)#
- Parameters:
task_idx (int) –
task_insert_ID (int) –
selection (slice) –
keep_iterations_idx (bool) –
- Return type:
List
- get_template()#
- Return type:
Dict
- property has_pending: bool#
Returns True if there are pending changes that are not yet committed.
- is_modified_on_disk()#
Check if the workflow (metadata) has been modified on disk since initial load (this is bad).
- Return type:
bool
- is_parameter_set(index)#
- Parameters:
index (int) –
- Return type:
bool
- load_metadata()#
- classmethod path_has_store(path)#
Is a given workflow path of this store type?
- reinstate_replaced_dir()#
- Return type:
None
- reject_pending()#
- Return type:
None
- remove_replaced_dir()#
- Return type:
None
- set_parameter(index, data)#
Set the value of a pre-allocated parameter.
- Parameters:
index (int) –
data (Any) –
- Return type:
None
- property store_path#
Get the store path, which may be the same as the workflow path.
- classmethod write_empty_workflow(template_js, template_components_js, workflow_path, replaced_dir, creation_info)#
- Parameters:
template_js (Dict) –
template_components_js (Dict) –
workflow_path (Path) –
replaced_dir (Path) –
creation_info (Dict) –
- Return type:
None
Module contents#
- hpcflow.sdk.persistence.store_cls_from_path(workflow_path)#
- Parameters:
workflow_path (Path) –
- Return type:
Type[PersistentStore]
- hpcflow.sdk.persistence.store_cls_from_str(store_format)#
- Parameters:
store_format (str) –
- Return type:
Type[PersistentStore]
- hpcflow.sdk.persistence.temporary_workflow_rename(path)#
Rename an existing same-path workflow directory so we can restore it if workflow creation fails