hpcflow.sdk.persistence.zarr.ZarrPersistentStore#
- class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(app, workflow, path, fs)#
Bases:
PersistentStore
[ZarrStoreTask
,ZarrStoreElement
,ZarrStoreElementIter
,ZarrStoreEAR
,ZarrStoreParameter
]A persistent store implemented using Zarr.
Methods
Add a new EAR to an element iteration.
Add a new element to a task.
Add a new iteration to an element.
Add an element set to a task.
Add a file that will be associated with a parameter.
Add a new loop to the workflow.
Add a parameter that is set to a value.
Add a new submission.
Add a new task to the workflow.
Add template components to the workflow.
Add a parameter that is not set to any value.
Context manager for using the persistent element/iteration/run cache.
Context manager to cache the root attributes.
For each parameter ID, return True if it exists, else False.
Clear the cache of at-submit-time jobscript metadata.
Copy the workflow store.
Delete the persistent workflow.
Permanently delete the workflow data with no confirmation.
Whether the element action run with the given ID was skipped.
Get element action runs with the given IDs.
Get information about the creation of the workflow.
Retrieve the run directories array.
Get element iterations with the given IDs.
Get elements with the given IDs.
For the specified jobscript, retrieve the values of jobscript-submit-time attributes.
For the specified jobscript-block, retrieve the dependencies.
For the specified jobscript-block, retrieve the run ID array.
For the specified jobscript-block, retrieve the task-actions array.
For the specified jobscript-block, retrieve the task-elements mapping.
Retrieve all loops, including pending.
Retrieve loops by index (ID), including pending.
Get the name of the workflow.
Get whether the parameters with the given IDs are set.
Get the sources of the parameters with the given IDs.
Get parameters with the given IDs.
Retrieve the values of submission attributes that are stored at submit-time.
Retrieve all submissions, including pending.
Get submissions with the given IDs.
Get a task.
Get element data by an indices within a given task.
Retrieve all tasks, including pending.
Get tasks with the given IDs.
Get the workflow template.
Get all template components, including pending.
Retrieve the contents of a text file stored within the workflow.
Get the format for timestamps.
Get the format for timestamps to use in names.
Generate an store for testing purposes.
Context manager for using the parameters-metadata cache.
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
Rechunk the parameter data to be stored more efficiently.
Rechunk the run data to be stored more efficiently.
Reinstate the directory containing replaced workflow details.
Try very hard to delete a directory or file.
Remove the directory containing replaced workflow details.
Revert the replaced workflow path to its original name.
Commit pending changes to disk, if not in batch-update mode.
Mark an element action run as finished.
Mark element action runs as skipped for the specified reasons.
Mark an element action run as started.
Mark an element action run as initialised.
Set details of a file, including whether it is associated with a parameter.
Set the metadata for a job script.
Set the value of a parameter.
Set multiple non-file parameter values by parameter IDs.
Set the run submission data, like the submission index for an element action run.
Convert this store into expanded form.
Update metadata that is set at submit-time.
Update data indices of one or more iterations.
Add iterations to a loop.
Set the parents of a loop.
Set the source of a parameter.
Update data indices of one or more runs.
Context manager for managing StoreResource objects associated with the store.
Write an empty persistent workflow.
Convert the persistent store to zipped form.
Attributes
Cache for persistent EARs.
Cache for persistent elements.
Cache for persistent element iterations.
Whether there are any pending changes.
Does this store support workflow submission?
The logger to use.
Cache for total number of persistent EARs.
Cache for number of persistent tasks.
Cache for persistent parameter sources.
Cache for persistent parameters.
Cache for persistent tasks.
The format for timestamps.
Whether to use a cache.
The workflow this relates to.
The underlying store object.
- Parameters:
path (str | Path) –
fs (AbstractFileSystem) –
- add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#
Add a new EAR to an element iteration.
- add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#
Add a new element to a task.
- add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#
Add a new iteration to an element.
- add_element_set(task_id, es_js, save=True)#
Add an element set to a task.
- add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#
Add a file that will be associated with a parameter.
- add_loop(loop_template, iterable_parameters, output_parameters, parents, num_added_iterations, iter_IDs, save=True)#
Add a new loop to the workflow.
- add_set_parameter(data, source, save=True)#
Add a parameter that is set to a value.
- add_submission(sub_idx, sub_js, save=True)#
Add a new submission.
- add_task(idx, task_template, save=True)#
Add a new task to the workflow.
- add_template_components(temp_comps, save=True)#
Add template components to the workflow.
- add_unset_parameter(source, save=True)#
Add a parameter that is not set to any value.
- Parameters:
source (ParamSource) –
save (bool) –
- Return type:
- cache_ctx()#
Context manager for using the persistent element/iteration/run cache.
- Return type:
Iterator[None]
- cached_load()#
Context manager to cache the root attributes.
- Return type:
Iterator[None]
- check_parameters_exist(ids)#
For each parameter ID, return True if it exists, else False.
- clear_jobscript_at_submit_metadata_cache()#
Clear the cache of at-submit-time jobscript metadata.
- copy(path=None)#
Copy the workflow store.
This does not work on remote filesystems.
- Parameters:
path (PathLike) –
- Return type:
Path
- delete()#
Delete the persistent workflow.
- Return type:
None
- delete_no_confirm()#
Permanently delete the workflow data with no confirmation.
- Return type:
None
- get_EAR_skipped(EAR_ID)#
Whether the element action run with the given ID was skipped.
- get_EARs(ids)#
Get element action runs with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySEAR]
- get_creation_info()#
Get information about the creation of the workflow.
- get_dirs_array()#
Retrieve the run directories array.
- Return type:
NDArray
- get_element_iterations(ids)#
Get element iterations with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElementIter]
- get_elements(ids)#
Get elements with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySElement]
- get_jobscript_at_submit_metadata(sub_idx, js_idx, metadata_attr)#
For the specified jobscript, retrieve the values of jobscript-submit-time attributes.
Notes
If the cache does not exist, this method will retrieve and cache metadata for all jobscripts for which metadata has been set. If the cache does exist, but not for the requested jobscript, then this method will retrieve and cache metadata for all non-cached jobscripts for which metadata has been set. If metadata has not yet been set for the specified jobscript, and dict with all None values will be returned.
The cache can be cleared using the method clear_jobscript_at_submit_metadata_cache.
- get_jobscript_block_dependencies(sub_idx, js_idx, blk_idx, js_dependencies)#
For the specified jobscript-block, retrieve the dependencies.
- get_jobscript_block_run_ID_array(sub_idx, js_idx, blk_idx, run_ID_arr)#
For the specified jobscript-block, retrieve the run ID array.
- get_jobscript_block_task_actions_array(sub_idx, js_idx, blk_idx, task_actions_arr)#
For the specified jobscript-block, retrieve the task-actions array.
- get_jobscript_block_task_elements_map(sub_idx, js_idx, blk_idx, task_elems_map)#
For the specified jobscript-block, retrieve the task-elements mapping.
- get_loops()#
Retrieve all loops, including pending.
- Return type:
- get_loops_by_IDs(ids)#
Retrieve loops by index (ID), including pending.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_name()#
Get the name of the workflow.
- get_parameter_set_statuses(ids)#
Get whether the parameters with the given IDs are set.
- get_parameter_sources(ids)#
Get the sources of the parameters with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
- get_parameters(ids, **kwargs)#
Get parameters with the given IDs.
- get_submission_at_submit_metadata(sub_idx, metadata_attr)#
Retrieve the values of submission attributes that are stored at submit-time.
- get_submissions()#
Retrieve all submissions, including pending.
- get_submissions_by_ID(ids)#
Get submissions with the given IDs.
- get_task_elements(task_id, idx_lst=None)#
Get element data by an indices within a given task.
Element iterations and EARs belonging to the elements are included.
- get_tasks_by_IDs(ids)#
Get tasks with the given IDs.
- Parameters:
ids (Iterable[int]) –
- Return type:
Sequence[AnySTask]
- get_template_components()#
Get all template components, including pending.
- get_text_file(path)#
Retrieve the contents of a text file stored within the workflow.
- get_ts_fmt()#
Get the format for timestamps.
- get_ts_name_fmt()#
Get the format for timestamps to use in names.
- classmethod make_test_store_from_spec(spec, dir=None, path='test_store', overwrite=False)#
Generate an store for testing purposes.
- property param_sources_cache: dict[int, ParamSource]#
Cache for persistent parameter sources.
- parameters_metadata_cache()#
Context manager for using the parameters-metadata cache.
Notes
This method can be overridden by a subclass to provide an implementation-specific cache of metadata associated with parameters, or even parameter data itself.
Using this cache precludes writing/setting parameter data.
- static prepare_test_store_from_spec(task_spec)#
Generate a valid store from a specification in terms of nested elements/iterations/EARs.
- rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
Rechunk the parameter data to be stored more efficiently.
- rechunk_runs(chunk_size=None, backup=True, status=True)#
Rechunk the run data to be stored more efficiently.
- reinstate_replaced_dir()#
Reinstate the directory containing replaced workflow details.
- Return type:
None
- remove_path(path)#
Try very hard to delete a directory or file.
Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.
- Parameters:
path (str | Path) –
- Return type:
None
- remove_replaced_dir()#
Remove the directory containing replaced workflow details.
- Return type:
None
- rename_path(replaced, original)#
Revert the replaced workflow path to its original name.
This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.
- save()#
Commit pending changes to disk, if not in batch-update mode.
- Return type:
None
- set_EAR_end(EAR_ID, exit_code, success, snapshot, save=True)#
Mark an element action run as finished.
- set_EAR_skip(skip_reasons, save=True)#
Mark element action runs as skipped for the specified reasons.
- set_EAR_start(EAR_ID, run_dir, port_number, save=True)#
Mark an element action run as started.
- set_EARs_initialised(iter_ID, save=True)#
Mark an element action run as initialised.
- set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#
Set details of a file, including whether it is associated with a parameter.
- set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, shell_idx=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#
Set the metadata for a job script.
- Parameters:
sub_idx (int) –
js_idx (int) –
version_info (VersionInfo | None) –
submit_time (str | None) –
submit_hostname (str | None) –
submit_machine (str | None) –
shell_idx (int | None) –
os_name (str | None) –
shell_name (str | None) –
scheduler_name (str | None) –
scheduler_job_ID (str | None) –
process_ID (int | None) –
save (bool) –
- set_multi_run_ends(run_ids, run_dirs, exit_codes, successes, save=True)#
- set_multi_run_starts(run_ids, run_dirs, port_number, save=True)#
- set_parameter_value(param_id, value, is_file=False, save=True)#
Set the value of a parameter.
- set_parameter_values(values, save=True)#
Set multiple non-file parameter values by parameter IDs.
- set_run_dirs(run_dir_indices, run_idx, save=True)#
- set_run_submission_data(EAR_ID, cmds_ID, sub_idx, save=True)#
Set the run submission data, like the submission index for an element action run.
- unzip(path='.', log=None)#
Convert this store into expanded form.
- update_at_submit_metadata(sub_idx, submission_parts, save=True)#
Update metadata that is set at submit-time.
- update_iter_data_indices(data_indices)#
Update data indices of one or more iterations.
- update_loop_num_iters(index, num_added_iters, save=True)#
Add iterations to a loop.
- update_loop_parents(index, num_added_iters, parents, save=True)#
Set the parents of a loop.
- update_param_source(param_sources, save=True)#
Set the source of a parameter.
- Parameters:
param_sources (Mapping[int, ParamSource]) –
save (bool) –
- Return type:
None
- update_run_data_indices(data_indices)#
Update data indices of one or more runs.
- using_resource(res_label, action)#
Context manager for managing StoreResource objects associated with the store.
- Parameters:
res_label (Literal['metadata', 'submissions', 'parameters', 'attrs', 'runs']) –
action (str) –
- Return type:
Iterator[Any]
- classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, ts_fmt, ts_name_fmt, creation_info, compressor='blosc', compressor_kwargs=None)#
Write an empty persistent workflow.
- Parameters:
app (BaseApp) –
template_js (TemplateMeta) –
wk_path (str) –
fs (AbstractFileSystem) –
name (str) –
replaced_wk (str | None) –
ts_fmt (str) –
ts_name_fmt (str) –
creation_info (StoreCreationInfo) –
compressor (str | None) –
- Return type:
None
- property zarr_store: Store#
The underlying store object.
- zip(path='.', log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#
Convert the persistent store to zipped form.
- Parameters:
path (str) – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.
log (str | None) –
overwrite (bool) –
include_execute (bool) –
include_rechunk_backups (bool) –