hpcflow.sdk.persistence.base.PersistentStore#
- class hpcflow.sdk.persistence.base.PersistentStore(app, workflow, path, fs=None)#
Bases:
ABC
,Generic
[AnySTask
,AnySElement
,AnySElementIter
,AnySEAR
,AnySParameter
]An abstract class representing a persistent workflow store.
- Parameters:
app (App) – The main hpcflow core.
workflow (Workflow) – The workflow being persisted.
path (pathlib.Path) – Where to hold the store.
fs (fsspec.AbstractFileSystem) – Optionally, information about how to access the store.
Methods
Add a new EAR to an element iteration.
Add a new element to a task.
Add a new iteration to an element.
Add an element set to a task.
Add a file that will be associated with a parameter.
Add a new loop to the workflow.
Add a parameter that is set to a value.
Add a new submission.
Add a new task to the workflow.
Add template components to the workflow.
Add a parameter that is not set to any value.
Context manager for using the persistent element/iteration/run cache.
Perform a load with cache enabled while the
with
-wrapped code runs.For each parameter ID, return True if it exists, else False.
Copy the workflow store.
Delete the persistent workflow.
Permanently delete the workflow data with no confirmation.
Whether the element action run with the given ID was skipped.
Get element action runs with the given IDs.
Get the workflow creation data.
Retrieve the run directories array.
Get element iterations with the given IDs.
Get elements with the given IDs.
For the specified jobscript, retrieve the values of jobscript-submit-time attributes.
For the specified jobscript-block, retrieve the dependencies.
For the specified jobscript-block, retrieve the run ID array.
For the specified jobscript-block, retrieve the task-actions array.
For the specified jobscript-block, retrieve the task-elements mapping.
Retrieve all loops, including pending.
Retrieve loops by index (ID), including pending.
Get the workflow name.
Get whether the parameters with the given IDs are set.
Get the sources of the parameters with the given IDs.
Get parameters with the given IDs.
Retrieve the values of submission attributes that are stored at submit-time.
Retrieve all submissions, including pending.
Get submissions with the given IDs.
Get a task.
Get element data by an indices within a given task.
Retrieve all tasks, including pending.
Get tasks with the given IDs.
Get the workflow template.
Get all template components, including pending.
Retrieve the contents of a text file stored within the workflow.
Get the timestamp format.
Get the timestamp format for names.
Context manager for using the parameters-metadata cache.
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
Reinstate a replaced directory.
Try very hard to delete a directory or file.
Remove a replaced directory.
Revert the replaced workflow path to its original name.
Commit pending changes to disk, if not in batch-update mode.
Mark an element action run as finished.
Mark element action runs as skipped for the specified reasons.
Mark an element action run as started.
Mark an element action run as initialised.
Set details of a file, including whether it is associated with a parameter.
Set the metadata for a job script.
Set the value of a parameter.
Set multiple non-file parameter values by parameter IDs.
Set the run submission data, like the submission index for an element action run.
Convert this store into expanded form.
Update metadata that is set at submit-time.
Update data indices of one or more iterations.
Add iterations to a loop.
Set the parents of a loop.
Set the source of a parameter.
Update data indices of one or more runs.
Context manager for managing StoreResource objects associated with the store.
Write an empty workflow.
Convert this store into archival form.
Attributes
Cache for persistent EARs.
Cache for persistent elements.
Cache for persistent element iterations.
Whether there are any pending changes.
Does this store support workflow submission?
The logger to use.
Cache for total number of persistent EARs.
Cache for number of persistent tasks.
Cache for persistent parameter sources.
Cache for persistent parameters.
Cache for persistent tasks.
The format for timestamps.
Whether to use a cache.
The workflow this relates to.
- add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#
Add a new EAR to an element iteration.
- add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#
Add a new element to a task.
- add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#
Add a new iteration to an element.
- add_element_set(task_id, es_js, save=True)#
Add an element set to a task.
- add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#
Add a file that will be associated with a parameter.
- add_loop(loop_template, iterable_parameters, output_parameters, parents, num_added_iterations, iter_IDs, save=True)#
Add a new loop to the workflow.
- add_set_parameter(data, source, save=True)#
Add a parameter that is set to a value.
- add_submission(sub_idx, sub_js, save=True)#
Add a new submission.
- add_task(idx, task_template, save=True)#
Add a new task to the workflow.
- add_template_components(temp_comps, save=True)#
Add template components to the workflow.
- add_unset_parameter(source, save=True)#
Add a parameter that is not set to any value.
- Parameters:
source (ParamSource) –
save (bool) –
- Return type:
- cache_ctx()#
Context manager for using the persistent element/iteration/run cache.
- Return type:
Iterator[None]
- abstract cached_load()#
Perform a load with cache enabled while the
with
-wrapped code runs.- Return type:
AbstractContextManager[None]
- check_parameters_exist(ids)#
For each parameter ID, return True if it exists, else False.
- copy(path=None)#
Copy the workflow store.
This does not work on remote filesystems.
- Parameters:
path (PathLike) –
- Return type:
Path
- delete()#
Delete the persistent workflow.
- Return type:
None
- delete_no_confirm()#
Permanently delete the workflow data with no confirmation.
- Return type:
None
- get_EAR_skipped(EAR_ID)#
Whether the element action run with the given ID was skipped.
- get_EARs(ids)#
Get element action runs with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySEAR]
- abstract get_creation_info()#
Get the workflow creation data.
- Return type:
- abstract get_dirs_array()#
Retrieve the run directories array.
- Return type:
NDArray
- get_element_iterations(ids)#
Get element iterations with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElementIter]
- get_elements(ids)#
Get elements with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElement]
- get_jobscript_at_submit_metadata(sub_idx, js_idx, metadata_attr)#
For the specified jobscript, retrieve the values of jobscript-submit-time attributes.
Notes
This method may need to be overridden if these jobscript-submit-time attributes are stored separately from the remainder of the jobscript attributes.
- get_jobscript_block_dependencies(sub_idx, js_idx, blk_idx, js_dependencies)#
For the specified jobscript-block, retrieve the dependencies.
Notes
This method may need to be overridden if these attributes are stored separately from the remainder of the submission attributes.
- get_jobscript_block_run_ID_array(sub_idx, js_idx, blk_idx, run_ID_arr)#
For the specified jobscript-block, retrieve the run ID array.
Notes
This method may need to be overridden if these attributes are stored separately from the remainder of the submission attributes.
- get_jobscript_block_task_actions_array(sub_idx, js_idx, blk_idx, task_actions_arr)#
For the specified jobscript-block, retrieve the task-actions array.
Notes
This method may need to be overridden if these attributes are stored separately from the remainder of the submission attributes.
- get_jobscript_block_task_elements_map(sub_idx, js_idx, blk_idx, task_elems_map)#
For the specified jobscript-block, retrieve the task-elements mapping.
Notes
This method may need to be overridden if these attributes are stored separately from the remainder of the submission attributes.
- get_loops()#
Retrieve all loops, including pending.
- Return type:
- get_loops_by_IDs(ids)#
Retrieve loops by index (ID), including pending.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_parameter_set_statuses(ids)#
Get whether the parameters with the given IDs are set.
- get_parameter_sources(ids)#
Get the sources of the parameters with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_parameters(ids, **kwargs)#
Get parameters with the given IDs.
- get_submission_at_submit_metadata(sub_idx, metadata_attr)#
Retrieve the values of submission attributes that are stored at submit-time.
Notes
This method may need to be overridden if these attributes are stored separately from the remainder of the submission attributes.
- get_submissions()#
Retrieve all submissions, including pending.
- get_submissions_by_ID(ids)#
Get submissions with the given IDs.
- get_task_elements(task_id, idx_lst=None)#
Get element data by an indices within a given task.
Element iterations and EARs belonging to the elements are included.
- get_tasks_by_IDs(ids)#
Get tasks with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySTask]
- get_template_components()#
Get all template components, including pending.
- get_text_file(path)#
Retrieve the contents of a text file stored within the workflow.
- property param_sources_cache: dict[int, ParamSource]#
Cache for persistent parameter sources.
- parameters_metadata_cache()#
Context manager for using the parameters-metadata cache.
Notes
This method can be overridden by a subclass to provide an implementation-specific cache of metadata associated with parameters, or even parameter data itself.
Using this cache precludes writing/setting parameter data.
- static prepare_test_store_from_spec(task_spec)#
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
- abstract rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
- abstract rechunk_runs(chunk_size=None, backup=True, status=True)#
- abstract reinstate_replaced_dir()#
Reinstate a replaced directory.
- Return type:
None
- remove_path(path)#
Try very hard to delete a directory or file.
Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.
- abstract remove_replaced_dir()#
Remove a replaced directory.
- Return type:
None
- rename_path(replaced, original)#
Revert the replaced workflow path to its original name.
This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.
- save()#
Commit pending changes to disk, if not in batch-update mode.
- Return type:
None
- set_EAR_end(EAR_ID, exit_code, success, snapshot, save=True)#
Mark an element action run as finished.
- set_EAR_skip(skip_reasons, save=True)#
Mark element action runs as skipped for the specified reasons.
- set_EAR_start(EAR_ID, run_dir, port_number, save=True)#
Mark an element action run as started.
- set_EARs_initialised(iter_ID, save=True)#
Mark an element action run as initialised.
- set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#
Set details of a file, including whether it is associated with a parameter.
- set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, shell_idx=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#
Set the metadata for a job script.
- Parameters:
sub_idx (int) –
js_idx (int) –
version_info (VersionInfo | None) –
submit_time (str | None) –
submit_hostname (str | None) –
submit_machine (str | None) –
shell_idx (int | None) –
os_name (str | None) –
shell_name (str | None) –
scheduler_name (str | None) –
scheduler_job_ID (str | None) –
process_ID (int | None) –
save (bool) –
- set_multi_run_ends(run_ids, run_dirs, exit_codes, successes, save=True)#
- set_multi_run_starts(run_ids, run_dirs, port_number, save=True)#
- set_parameter_value(param_id, value, is_file=False, save=True)#
Set the value of a parameter.
- set_parameter_values(values, save=True)#
Set multiple non-file parameter values by parameter IDs.
- set_run_dirs(run_dir_indices, run_idx, save=True)#
- set_run_submission_data(EAR_ID, cmds_ID, sub_idx, save=True)#
Set the run submission data, like the submission index for an element action run.
- abstract unzip(path='.', log=None)#
Convert this store into expanded form.
- update_at_submit_metadata(sub_idx, submission_parts, save=True)#
Update metadata that is set at submit-time.
- update_iter_data_indices(data_indices)#
Update data indices of one or more iterations.
- update_loop_num_iters(index, num_added_iters, save=True)#
Add iterations to a loop.
- update_loop_parents(index, num_added_iters, parents, save=True)#
Set the parents of a loop.
- update_param_source(param_sources, save=True)#
Set the source of a parameter.
- Parameters:
param_sources (Mapping[int, ParamSource]) –
save (bool) –
- Return type:
None
- update_run_data_indices(data_indices)#
Update data indices of one or more runs.
- using_resource(res_label: Literal['metadata'], action: str) AbstractContextManager[Metadata] #
- using_resource(res_label: Literal['submissions'], action: str) AbstractContextManager[list[dict[str, JSONed]]]
- using_resource(res_label: Literal['parameters'], action: str) AbstractContextManager[dict[str, dict[str, Any]]]
- using_resource(res_label: Literal['runs'], action: str) AbstractContextManager[dict[str, Any]]
- using_resource(res_label: Literal['attrs'], action: str) AbstractContextManager[ZarrAttrsDict]
Context manager for managing StoreResource objects associated with the store.
- abstract classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, creation_info, ts_fmt, ts_name_fmt)#
Write an empty workflow.
- Parameters:
app (BaseApp) –
template_js (TemplateMeta) –
wk_path (str) –
fs (AbstractFileSystem) –
name (str) –
replaced_wk (str | None) –
creation_info (StoreCreationInfo) –
ts_fmt (str) –
ts_name_fmt (str) –
- Return type:
None