hpcflow.sdk.persistence.base.PersistentStore#
- class hpcflow.sdk.persistence.base.PersistentStore(app, workflow, path, fs=None)#
Bases:
ABC
,Generic
[AnySTask
,AnySElement
,AnySElementIter
,AnySEAR
,AnySParameter
]An abstract class representing a persistent workflow store.
- Parameters:
app (App) – The main hpcflow core.
workflow (Workflow) – The workflow being persisted.
path (pathlib.Path) – Where to hold the store.
fs (fsspec.AbstractFileSystem) – Optionally, information about how to access the store.
Methods
Add a new EAR to an element iteration.
Add a new element to a task.
Add a new iteration to an element.
Add an element set to a task.
Add a file that will be associated with a parameter.
Add a new loop to the workflow.
Add a parameter that is set to a value.
Add a new submission.
Add a submission part.
Add a new task to the workflow.
Add template components to the workflow.
Add a parameter that is not set to any value.
Context manager for using the persistent element/iteration/run cache.
Perform a load with cache enabled while the
with
-wrapped code runs.For each parameter ID, return True if it exists, else False.
Copy the workflow store.
Delete the persistent workflow.
Permanently delete the workflow data with no confirmation.
Whether the element action run with the given ID was skipped.
Get element action runs with the given IDs.
Get the workflow creation data.
Get element iterations with the given IDs.
Get elements with the given IDs.
Retrieve all loops, including pending.
Retrieve loops by index (ID), including pending.
Get the workflow name.
Get whether the parameters with the given IDs are set.
Get the sources of the parameters with the given IDs.
Get parameters with the given IDs.
Retrieve all submissions, including pending.
Get submissions with the given IDs.
Get a task.
Get element data by an indices within a given task.
Retrieve all tasks, including pending.
Get tasks with the given IDs.
Get the workflow template.
Get all template components, including pending.
Get the timestamp format.
Get the timestamp format for names.
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
Reinstate a replaced directory.
Try very hard to delete a directory or file.
Remove a replaced directory.
Revert the replaced workflow path to its original name.
Commit pending changes to disk, if not in batch-update mode.
Mark an element action run as finished.
Mark an element action run as skipped.
Mark an element action run as started.
Set the submission index for an element action run.
Mark an element action run as initialised.
Set details of a file, including whether it is associated with a parameter.
Set the metadata for a job script.
Set the value of a parameter.
Convert this store into expanded form.
Add iterations to a loop.
Set the parents of a loop.
Set the source of a parameter.
Context manager for managing StoreResource objects associated with the store.
Write an empty workflow.
Convert this store into archival form.
Attributes
Cache for persistent EARs.
Cache for persistent elements.
Cache for persistent element iterations.
Whether there are any pending changes.
Does this store support workflow submission?
The logger to use.
Cache for total number of persistent EARs.
Cache for number of persistent tasks.
Cache for persistent parameter sources.
Cache for persistent parameters.
Cache for persistent tasks.
The format for timestamps.
Whether to use a cache.
The workflow this relates to.
- add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#
Add a new EAR to an element iteration.
- add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#
Add a new element to a task.
- add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#
Add a new iteration to an element.
- add_element_set(task_id, es_js, save=True)#
Add an element set to a task.
- add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#
Add a file that will be associated with a parameter.
- add_loop(loop_template, iterable_parameters, parents, num_added_iterations, iter_IDs, save=True)#
Add a new loop to the workflow.
- add_set_parameter(data, source, save=True)#
Add a parameter that is set to a value.
- add_submission(sub_idx, sub_js, save=True)#
Add a new submission.
- add_submission_part(sub_idx, dt_str, submitted_js_idx, save=True)#
Add a submission part.
- add_task(idx, task_template, save=True)#
Add a new task to the workflow.
- add_template_components(temp_comps, save=True)#
Add template components to the workflow.
- add_unset_parameter(source, save=True)#
Add a parameter that is not set to any value.
- Parameters:
source (ParamSource) –
save (bool) –
- Return type:
- cache_ctx()#
Context manager for using the persistent element/iteration/run cache.
- Return type:
Iterator[None]
- abstract cached_load()#
Perform a load with cache enabled while the
with
-wrapped code runs.- Return type:
AbstractContextManager[None]
- check_parameters_exist(ids)#
For each parameter ID, return True if it exists, else False.
- copy(path=None)#
Copy the workflow store.
This does not work on remote filesystems.
- Parameters:
path (PathLike) –
- Return type:
Path
- delete()#
Delete the persistent workflow.
- Return type:
None
- delete_no_confirm()#
Permanently delete the workflow data with no confirmation.
- Return type:
None
- get_EAR_skipped(EAR_ID)#
Whether the element action run with the given ID was skipped.
- get_EARs(ids)#
Get element action runs with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySEAR]
- abstract get_creation_info()#
Get the workflow creation data.
- Return type:
- get_element_iterations(ids)#
Get element iterations with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElementIter]
- get_elements(ids)#
Get elements with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElement]
- get_loops()#
Retrieve all loops, including pending.
- Return type:
- get_loops_by_IDs(ids)#
Retrieve loops by index (ID), including pending.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_parameter_set_statuses(ids)#
Get whether the parameters with the given IDs are set.
- get_parameter_sources(ids)#
Get the sources of the parameters with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_parameters(ids, **kwargs)#
Get parameters with the given IDs.
- get_submissions()#
Retrieve all submissions, including pending.
- get_submissions_by_ID(ids)#
Get submissions with the given IDs.
- get_task_elements(task_id, idx_lst=None)#
Get element data by an indices within a given task.
Element iterations and EARs belonging to the elements are included.
- get_tasks_by_IDs(ids)#
Get tasks with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySTask]
- get_template_components()#
Get all template components, including pending.
- property param_sources_cache: dict[int, ParamSource]#
Cache for persistent parameter sources.
- static prepare_test_store_from_spec(task_spec)#
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
- abstract rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
- abstract rechunk_runs(chunk_size=None, backup=True, status=True)#
- abstract reinstate_replaced_dir()#
Reinstate a replaced directory.
- Return type:
None
- remove_path(path)#
Try very hard to delete a directory or file.
Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.
- abstract remove_replaced_dir()#
Remove a replaced directory.
- Return type:
None
- rename_path(replaced, original)#
Revert the replaced workflow path to its original name.
This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.
- save()#
Commit pending changes to disk, if not in batch-update mode.
- Return type:
None
- set_EAR_end(EAR_ID, exit_code, success, save=True)#
Mark an element action run as finished.
- set_EAR_skip(EAR_ID, save=True)#
Mark an element action run as skipped.
- set_EAR_start(EAR_ID, save=True)#
Mark an element action run as started.
- set_EAR_submission_index(EAR_ID, sub_idx, save=True)#
Set the submission index for an element action run.
- set_EARs_initialised(iter_ID, save=True)#
Mark an element action run as initialised.
- set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#
Set details of a file, including whether it is associated with a parameter.
- set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#
Set the metadata for a job script.
- Parameters:
sub_idx (int) –
js_idx (int) –
version_info (VersionInfo | None) –
submit_time (str | None) –
submit_hostname (str | None) –
submit_machine (str | None) –
os_name (str | None) –
shell_name (str | None) –
scheduler_name (str | None) –
scheduler_job_ID (str | None) –
process_ID (int | None) –
save (bool) –
- set_parameter_value(param_id, value, is_file=False, save=True)#
Set the value of a parameter.
- abstract unzip(path='.', log=None)#
Convert this store into expanded form.
- update_loop_num_iters(index, num_added_iters, save=True)#
Add iterations to a loop.
- update_loop_parents(index, num_added_iters, parents, save=True)#
Set the parents of a loop.
- update_param_source(param_sources, save=True)#
Set the source of a parameter.
- Parameters:
param_sources (Mapping[int, ParamSource]) –
save (bool) –
- Return type:
None
- using_resource(res_label: Literal['metadata'], action: str) AbstractContextManager[Metadata] #
- using_resource(res_label: Literal['submissions'], action: str) AbstractContextManager[list[JSONDocument]]
- using_resource(res_label: Literal['parameters'], action: str) AbstractContextManager[dict[str, dict[str, Any]]]
- using_resource(res_label: Literal['attrs'], action: str) AbstractContextManager[ZarrAttrsDict]
Context manager for managing StoreResource objects associated with the store.
- abstract classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, creation_info, ts_fmt, ts_name_fmt)#
Write an empty workflow.
- Parameters:
app (BaseApp) –
template_js (TemplateMeta) –
wk_path (str) –
fs (AbstractFileSystem) –
name (str) –
replaced_wk (str | None) –
creation_info (StoreCreationInfo) –
ts_fmt (str) –
ts_name_fmt (str) –
- Return type:
None