hpcflow.sdk.persistence.zarr.ZarrPersistentStore#
- class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(app, workflow, path, fs)#
Bases:
PersistentStore
[ZarrStoreTask
,ZarrStoreElement
,ZarrStoreElementIter
,ZarrStoreEAR
,ZarrStoreParameter
]A persistent store implemented using Zarr.
Methods
Add a new EAR to an element iteration.
Add a new element to a task.
Add a new iteration to an element.
Add an element set to a task.
Add a file that will be associated with a parameter.
Add a new loop to the workflow.
Add a parameter that is set to a value.
Add a new submission.
Add a submission part.
Add a new task to the workflow.
Add template components to the workflow.
Add a parameter that is not set to any value.
Context manager for using the persistent element/iteration/run cache.
Context manager to cache the root attributes.
For each parameter ID, return True if it exists, else False.
Copy the workflow store.
Delete the persistent workflow.
Permanently delete the workflow data with no confirmation.
Whether the element action run with the given ID was skipped.
Get element action runs with the given IDs.
Get information about the creation of the workflow.
Get element iterations with the given IDs.
Get elements with the given IDs.
Retrieve all loops, including pending.
Retrieve loops by index (ID), including pending.
Get the name of the workflow.
Get whether the parameters with the given IDs are set.
Get the sources of the parameters with the given IDs.
Get parameters with the given IDs.
Retrieve all submissions, including pending.
Get submissions with the given IDs.
Get a task.
Get element data by an indices within a given task.
Retrieve all tasks, including pending.
Get tasks with the given IDs.
Get the workflow template.
Get all template components, including pending.
Get the format for timestamps.
Get the format for timestamps to use in names.
Generate an store for testing purposes.
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
Rechunk the parameter data to be stored more efficiently.
Rechunk the run data to be stored more efficiently.
Reinstate the directory containing replaced workflow details.
Try very hard to delete a directory or file.
Remove the directory containing replaced workflow details.
Revert the replaced workflow path to its original name.
Commit pending changes to disk, if not in batch-update mode.
Mark an element action run as finished.
Mark an element action run as skipped.
Mark an element action run as started.
Set the submission index for an element action run.
Mark an element action run as initialised.
Set details of a file, including whether it is associated with a parameter.
Set the metadata for a job script.
Set the value of a parameter.
Convert this store into expanded form.
Add iterations to a loop.
Set the parents of a loop.
Set the source of a parameter.
Context manager for managing StoreResource objects associated with the store.
Write an empty persistent workflow.
Convert the persistent store to zipped form.
Attributes
Cache for persistent EARs.
Cache for persistent elements.
Cache for persistent element iterations.
Whether there are any pending changes.
Does this store support workflow submission?
The logger to use.
Cache for total number of persistent EARs.
Cache for number of persistent tasks.
Cache for persistent parameter sources.
Cache for persistent parameters.
Cache for persistent tasks.
The format for timestamps.
Whether to use a cache.
The workflow this relates to.
The underlying store object.
- Parameters:
path (str | Path) –
fs (AbstractFileSystem) –
- add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#
Add a new EAR to an element iteration.
- add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#
Add a new element to a task.
- add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#
Add a new iteration to an element.
- add_element_set(task_id, es_js, save=True)#
Add an element set to a task.
- add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#
Add a file that will be associated with a parameter.
- add_loop(loop_template, iterable_parameters, parents, num_added_iterations, iter_IDs, save=True)#
Add a new loop to the workflow.
- add_set_parameter(data, source, save=True)#
Add a parameter that is set to a value.
- add_submission(sub_idx, sub_js, save=True)#
Add a new submission.
- add_submission_part(sub_idx, dt_str, submitted_js_idx, save=True)#
Add a submission part.
- add_task(idx, task_template, save=True)#
Add a new task to the workflow.
- add_template_components(temp_comps, save=True)#
Add template components to the workflow.
- add_unset_parameter(source, save=True)#
Add a parameter that is not set to any value.
- Parameters:
source (ParamSource) –
save (bool) –
- Return type:
- cache_ctx()#
Context manager for using the persistent element/iteration/run cache.
- Return type:
Iterator[None]
- cached_load()#
Context manager to cache the root attributes.
- Return type:
Iterator[None]
- check_parameters_exist(ids)#
For each parameter ID, return True if it exists, else False.
- copy(path=None)#
Copy the workflow store.
This does not work on remote filesystems.
- Parameters:
path (PathLike) –
- Return type:
Path
- delete()#
Delete the persistent workflow.
- Return type:
None
- delete_no_confirm()#
Permanently delete the workflow data with no confirmation.
- Return type:
None
- get_EAR_skipped(EAR_ID)#
Whether the element action run with the given ID was skipped.
- get_EARs(ids)#
Get element action runs with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySEAR]
- get_creation_info()#
Get information about the creation of the workflow.
- get_element_iterations(ids)#
Get element iterations with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElementIter]
- get_elements(ids)#
Get elements with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElement]
- get_loops()#
Retrieve all loops, including pending.
- Return type:
- get_loops_by_IDs(ids)#
Retrieve loops by index (ID), including pending.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_name()#
Get the name of the workflow.
- get_parameter_set_statuses(ids)#
Get whether the parameters with the given IDs are set.
- get_parameter_sources(ids)#
Get the sources of the parameters with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_parameters(ids, **kwargs)#
Get parameters with the given IDs.
- get_submissions()#
Retrieve all submissions, including pending.
- get_submissions_by_ID(ids)#
Get submissions with the given IDs.
- get_task_elements(task_id, idx_lst=None)#
Get element data by an indices within a given task.
Element iterations and EARs belonging to the elements are included.
- get_tasks_by_IDs(ids)#
Get tasks with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySTask]
- get_template_components()#
Get all template components, including pending.
- get_ts_fmt()#
Get the format for timestamps.
- get_ts_name_fmt()#
Get the format for timestamps to use in names.
- classmethod make_test_store_from_spec(spec, dir=None, path='test_store', overwrite=False)#
Generate an store for testing purposes.
- property param_sources_cache: dict[int, ParamSource]#
Cache for persistent parameter sources.
- static prepare_test_store_from_spec(task_spec)#
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
- rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
Rechunk the parameter data to be stored more efficiently.
- rechunk_runs(chunk_size=None, backup=True, status=True)#
Rechunk the run data to be stored more efficiently.
- reinstate_replaced_dir()#
Reinstate the directory containing replaced workflow details.
- Return type:
None
- remove_path(path)#
Try very hard to delete a directory or file.
Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.
- remove_replaced_dir()#
Remove the directory containing replaced workflow details.
- Return type:
None
- rename_path(replaced, original)#
Revert the replaced workflow path to its original name.
This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.
- save()#
Commit pending changes to disk, if not in batch-update mode.
- Return type:
None
- set_EAR_end(EAR_ID, exit_code, success, save=True)#
Mark an element action run as finished.
- set_EAR_skip(EAR_ID, save=True)#
Mark an element action run as skipped.
- set_EAR_start(EAR_ID, save=True)#
Mark an element action run as started.
- set_EAR_submission_index(EAR_ID, sub_idx, save=True)#
Set the submission index for an element action run.
- set_EARs_initialised(iter_ID, save=True)#
Mark an element action run as initialised.
- set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#
Set details of a file, including whether it is associated with a parameter.
- set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#
Set the metadata for a job script.
- Parameters:
sub_idx (int) –
js_idx (int) –
version_info (VersionInfo | None) –
submit_time (str | None) –
submit_hostname (str | None) –
submit_machine (str | None) –
os_name (str | None) –
shell_name (str | None) –
scheduler_name (str | None) –
scheduler_job_ID (str | None) –
process_ID (int | None) –
save (bool) –
- set_parameter_value(param_id, value, is_file=False, save=True)#
Set the value of a parameter.
- unzip(path='.', log=None)#
Convert this store into expanded form.
- update_loop_num_iters(index, num_added_iters, save=True)#
Add iterations to a loop.
- update_loop_parents(index, num_added_iters, parents, save=True)#
Set the parents of a loop.
- update_param_source(param_sources, save=True)#
Set the source of a parameter.
- Parameters:
param_sources (Mapping[int, ParamSource]) –
save (bool) –
- Return type:
None
- using_resource(res_label, action)#
Context manager for managing StoreResource objects associated with the store.
- classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, ts_fmt, ts_name_fmt, creation_info, compressor='blosc', compressor_kwargs=None)#
Write an empty persistent workflow.
- Parameters:
app (BaseApp) –
template_js (TemplateMeta) –
wk_path (str) –
fs (AbstractFileSystem) –
name (str) –
replaced_wk (str | None) –
ts_fmt (str) –
ts_name_fmt (str) –
creation_info (StoreCreationInfo) –
compressor (str | None) –
- Return type:
None
- property zarr_store: Store#
The underlying store object.
- zip(path='.', log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#
Convert the persistent store to zipped form.
- Parameters:
path (str) – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.
log (str | None) –
overwrite (bool) –
include_execute (bool) –
include_rechunk_backups (bool) –