hpcflow.sdk.persistence.zarr.ZarrPersistentStore#

class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(app, workflow, path, fs)#

Bases: PersistentStore[ZarrStoreTask, ZarrStoreElement, ZarrStoreElementIter, ZarrStoreEAR, ZarrStoreParameter]

A persistent store implemented using Zarr.

Methods

add_EAR

Add a new EAR to an element iteration.

add_element

Add a new element to a task.

add_element_iteration

Add a new iteration to an element.

add_element_set

Add an element set to a task.

add_file

Add a file that will be associated with a parameter.

add_loop

Add a new loop to the workflow.

add_set_parameter

Add a parameter that is set to a value.

add_submission

Add a new submission.

add_task

Add a new task to the workflow.

add_template_components

Add template components to the workflow.

add_unset_parameter

Add a parameter that is not set to any value.

cache_ctx

Context manager for using the persistent element/iteration/run cache.

cached_load

Context manager to cache the root attributes.

check_parameters_exist

For each parameter ID, return True if it exists, else False.

clear_jobscript_at_submit_metadata_cache

Clear the cache of at-submit-time jobscript metadata.

copy

Copy the workflow store.

delete

Delete the persistent workflow.

delete_no_confirm

Permanently delete the workflow data with no confirmation.

get_EAR_skipped

Whether the element action run with the given ID was skipped.

get_EARs

Get element action runs with the given IDs.

get_creation_info

Get information about the creation of the workflow.

get_dirs_array

Retrieve the run directories array.

get_element_iterations

Get element iterations with the given IDs.

get_elements

Get elements with the given IDs.

get_jobscript_at_submit_metadata

For the specified jobscript, retrieve the values of jobscript-submit-time attributes.

get_jobscript_block_dependencies

For the specified jobscript-block, retrieve the dependencies.

get_jobscript_block_run_ID_array

For the specified jobscript-block, retrieve the run ID array.

get_jobscript_block_task_actions_array

For the specified jobscript-block, retrieve the task-actions array.

get_jobscript_block_task_elements_map

For the specified jobscript-block, retrieve the task-elements mapping.

get_loops

Retrieve all loops, including pending.

get_loops_by_IDs

Retrieve loops by index (ID), including pending.

get_name

Get the name of the workflow.

get_parameter_set_statuses

Get whether the parameters with the given IDs are set.

get_parameter_sources

Get the sources of the parameters with the given IDs.

get_parameters

Get parameters with the given IDs.

get_submission_at_submit_metadata

Retrieve the values of submission attributes that are stored at submit-time.

get_submissions

Retrieve all submissions, including pending.

get_submissions_by_ID

Get submissions with the given IDs.

get_task

Get a task.

get_task_elements

Get element data by an indices within a given task.

get_tasks

Retrieve all tasks, including pending.

get_tasks_by_IDs

Get tasks with the given IDs.

get_template

Get the workflow template.

get_template_components

Get all template components, including pending.

get_text_file

Retrieve the contents of a text file stored within the workflow.

get_ts_fmt

Get the format for timestamps.

get_ts_name_fmt

Get the format for timestamps to use in names.

make_test_store_from_spec

Generate an store for testing purposes.

parameters_metadata_cache

Context manager for using the parameters-metadata cache.

prepare_test_store_from_spec

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

rechunk_parameter_base

Rechunk the parameter data to be stored more efficiently.

rechunk_runs

Rechunk the run data to be stored more efficiently.

reinstate_replaced_dir

Reinstate the directory containing replaced workflow details.

remove_path

Try very hard to delete a directory or file.

remove_replaced_dir

Remove the directory containing replaced workflow details.

rename_path

Revert the replaced workflow path to its original name.

save

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end

Mark an element action run as finished.

set_EAR_skip

Mark element action runs as skipped for the specified reasons.

set_EAR_start

Mark an element action run as started.

set_EARs_initialised

Mark an element action run as initialised.

set_file

Set details of a file, including whether it is associated with a parameter.

set_jobscript_metadata

Set the metadata for a job script.

set_multi_run_ends

set_multi_run_starts

set_parameter_value

Set the value of a parameter.

set_parameter_values

Set multiple non-file parameter values by parameter IDs.

set_run_dirs

set_run_submission_data

Set the run submission data, like the submission index for an element action run.

unzip

Convert this store into expanded form.

update_at_submit_metadata

Update metadata that is set at submit-time.

update_iter_data_indices

Update data indices of one or more iterations.

update_loop_num_iters

Add iterations to a loop.

update_loop_parents

Set the parents of a loop.

update_param_source

Set the source of a parameter.

update_run_data_indices

Update data indices of one or more runs.

using_resource

Context manager for managing StoreResource objects associated with the store.

write_empty_workflow

Write an empty persistent workflow.

zip

Convert the persistent store to zipped form.

Attributes

EAR_cache

Cache for persistent EARs.

element_cache

Cache for persistent elements.

element_iter_cache

Cache for persistent element iterations.

has_pending

Whether there are any pending changes.

is_submittable

Does this store support workflow submission?

logger

The logger to use.

num_EARs_cache

Cache for total number of persistent EARs.

num_params_cache

num_tasks_cache

Cache for number of persistent tasks.

param_sources_cache

Cache for persistent parameter sources.

parameter_cache

Cache for persistent parameters.

task_cache

Cache for persistent tasks.

ts_fmt

The format for timestamps.

use_cache

Whether to use a cache.

workflow

The workflow this relates to.

zarr_store

The underlying store object.

Parameters:
property EAR_cache: dict[int, AnySEAR]#

Cache for persistent EARs.

add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#

Add a new EAR to an element iteration.

Parameters:
  • elem_iter_ID (int) –

  • action_idx (int) –

  • commands_idx (list[int]) –

  • data_idx (DataIndex) –

  • metadata (Metadata | None) –

  • save (bool) –

Return type:

int

add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#

Add a new element to a task.

Parameters:
Return type:

int

add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#

Add a new iteration to an element.

Parameters:
  • element_ID (int) –

  • data_idx (DataIndex) –

  • schema_parameters (list[str]) –

  • loop_idx (Mapping[str, int] | None) –

  • save (bool) –

Return type:

int

add_element_set(task_id, es_js, save=True)#

Add an element set to a task.

Parameters:
  • task_id (int) –

  • es_js (Mapping) –

  • save (bool) –

add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#

Add a file that will be associated with a parameter.

Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • source (ParamSource) –

  • path (Path | str) –

  • contents (str | None) –

  • filename (str | None) –

  • save (bool) –

add_loop(loop_template, iterable_parameters, output_parameters, parents, num_added_iterations, iter_IDs, save=True)#

Add a new loop to the workflow.

Parameters:
  • loop_template (Mapping[str, Any]) –

  • iterable_parameters (Mapping[str, IterableParam]) –

  • output_parameters (Mapping[str, int]) –

  • parents (Sequence[str]) –

  • num_added_iterations (Mapping[tuple[int, ...], int]) –

  • iter_IDs (Iterable[int]) –

  • save (bool) –

add_set_parameter(data, source, save=True)#

Add a parameter that is set to a value.

Parameters:
Return type:

int

add_submission(sub_idx, sub_js, save=True)#

Add a new submission.

Parameters:
  • sub_idx (int) –

  • sub_js (Mapping[str, JSONed]) –

  • save (bool) –

add_task(idx, task_template, save=True)#

Add a new task to the workflow.

Parameters:
  • idx (int) –

  • task_template (Mapping) –

  • save (bool) –

add_template_components(temp_comps, save=True)#

Add template components to the workflow.

Parameters:
Return type:

None

add_unset_parameter(source, save=True)#

Add a parameter that is not set to any value.

Parameters:
Return type:

int

cache_ctx()#

Context manager for using the persistent element/iteration/run cache.

Return type:

Iterator[None]

cached_load()#

Context manager to cache the root attributes.

Return type:

Iterator[None]

check_parameters_exist(ids)#

For each parameter ID, return True if it exists, else False.

Parameters:

ids (Sequence[int]) –

Return type:

Iterator[bool]

clear_jobscript_at_submit_metadata_cache()#

Clear the cache of at-submit-time jobscript metadata.

copy(path=None)#

Copy the workflow store.

This does not work on remote filesystems.

Parameters:

path (PathLike) –

Return type:

Path

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

property element_cache: dict[int, AnySElement]#

Cache for persistent elements.

property element_iter_cache: dict[int, AnySElementIter]#

Cache for persistent element iterations.

get_EAR_skipped(EAR_ID)#

Whether the element action run with the given ID was skipped.

Parameters:

EAR_ID (int) –

Return type:

int

get_EARs(ids)#

Get element action runs with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySEAR]

get_creation_info()#

Get information about the creation of the workflow.

get_dirs_array()#

Retrieve the run directories array.

Return type:

NDArray

get_element_iterations(ids)#

Get element iterations with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySElementIter]

get_elements(ids)#

Get elements with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySElement]

get_jobscript_at_submit_metadata(sub_idx, js_idx, metadata_attr)#

For the specified jobscript, retrieve the values of jobscript-submit-time attributes.

Notes

If the cache does not exist, this method will retrieve and cache metadata for all jobscripts for which metadata has been set. If the cache does exist, but not for the requested jobscript, then this method will retrieve and cache metadata for all non-cached jobscripts for which metadata has been set. If metadata has not yet been set for the specified jobscript, and dict with all None values will be returned.

The cache can be cleared using the method clear_jobscript_at_submit_metadata_cache.

Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • metadata_attr (dict | None) –

Return type:

dict[str, Any]

get_jobscript_block_dependencies(sub_idx, js_idx, blk_idx, js_dependencies)#

For the specified jobscript-block, retrieve the dependencies.

Parameters:
Return type:

dict[tuple[int, int], ResolvedJobscriptBlockDependencies]

get_jobscript_block_run_ID_array(sub_idx, js_idx, blk_idx, run_ID_arr)#

For the specified jobscript-block, retrieve the run ID array.

Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • blk_idx (int) –

  • run_ID_arr (NDArray | None) –

Return type:

NDArray

get_jobscript_block_task_actions_array(sub_idx, js_idx, blk_idx, task_actions_arr)#

For the specified jobscript-block, retrieve the task-actions array.

Parameters:
Return type:

NDArray

get_jobscript_block_task_elements_map(sub_idx, js_idx, blk_idx, task_elems_map)#

For the specified jobscript-block, retrieve the task-elements mapping.

Parameters:
Return type:

dict[int, list[int]]

get_loops()#

Retrieve all loops, including pending.

Return type:

dict[int, LoopDescriptor]

get_loops_by_IDs(ids)#

Retrieve loops by index (ID), including pending.

Parameters:

ids (Iterable[int]) –

Return type:

dict[int, LoopDescriptor]

get_name()#

Get the name of the workflow.

get_parameter_set_statuses(ids)#

Get whether the parameters with the given IDs are set.

Parameters:

ids (Iterable[int]) –

Return type:

list[bool]

get_parameter_sources(ids)#

Get the sources of the parameters with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

list[ParamSource]

get_parameters(ids, **kwargs)#

Get parameters with the given IDs.

Parameters:

ids (Iterable[int]) – The IDs of the parameters to get.

Keyword Arguments:

dataset_copy (bool) – For Zarr stores only. If True, copy arrays as NumPy arrays.

Return type:

list[AnySParameter]

get_submission_at_submit_metadata(sub_idx, metadata_attr)#

Retrieve the values of submission attributes that are stored at submit-time.

Parameters:
  • sub_idx (int) –

  • metadata_attr (dict | None) –

Return type:

dict[str, Any]

get_submissions()#

Retrieve all submissions, including pending.

Return type:

dict[int, Mapping[str, JSONed]]

get_submissions_by_ID(ids)#

Get submissions with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

dict[int, Mapping[str, JSONed]]

get_task(task_idx)#

Get a task.

Parameters:

task_idx (int) –

Return type:

AnySTask

get_task_elements(task_id, idx_lst=None)#

Get element data by an indices within a given task.

Element iterations and EARs belonging to the elements are included.

Parameters:
  • task_id (int) –

  • idx_lst (Iterable[int] | None) –

Return type:

Iterator[Mapping[str, Any]]

get_tasks()#

Retrieve all tasks, including pending.

Return type:

list[~AnySTask]

get_tasks_by_IDs(ids)#

Get tasks with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySTask]

get_template()#

Get the workflow template.

Return type:

dict[str, JSONed]

get_template_components()#

Get all template components, including pending.

Return type:

dict[str, Any]

get_text_file(path)#

Retrieve the contents of a text file stored within the workflow.

Parameters:

path (str | Path) – The path to a text file stored within the workflow. This can either be an absolute path or a path that is relative to the workflow root.

Return type:

str

get_ts_fmt()#

Get the format for timestamps.

get_ts_name_fmt()#

Get the format for timestamps to use in names.

property has_pending: bool#

Whether there are any pending changes.

property is_submittable: bool#

Does this store support workflow submission?

property logger: Logger#

The logger to use.

classmethod make_test_store_from_spec(spec, dir=None, path='test_store', overwrite=False)#

Generate an store for testing purposes.

property num_EARs_cache: int | None#

Cache for total number of persistent EARs.

property num_params_cache: int | None#
property num_tasks_cache: int | None#

Cache for number of persistent tasks.

property param_sources_cache: dict[int, ParamSource]#

Cache for persistent parameter sources.

property parameter_cache: dict[int, AnySParameter]#

Cache for persistent parameters.

parameters_metadata_cache()#

Context manager for using the parameters-metadata cache.

Notes

This method can be overridden by a subclass to provide an implementation-specific cache of metadata associated with parameters, or even parameter data itself.

Using this cache precludes writing/setting parameter data.

static prepare_test_store_from_spec(task_spec)#

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

Parameters:

task_spec (Sequence[Mapping[str, Sequence[Mapping[str, Sequence[Mapping[str, Sequence]]]]]]) –

Return type:

tuple[list[dict], list[dict], list[dict], list[dict]]

rechunk_parameter_base(chunk_size=None, backup=True, status=True)#

Rechunk the parameter data to be stored more efficiently.

Parameters:
  • chunk_size (int | None) –

  • backup (bool) –

  • status (bool) –

Return type:

Array

rechunk_runs(chunk_size=None, backup=True, status=True)#

Rechunk the run data to be stored more efficiently.

Parameters:
  • chunk_size (int | None) –

  • backup (bool) –

  • status (bool) –

Return type:

Array

reinstate_replaced_dir()#

Reinstate the directory containing replaced workflow details.

Return type:

None

remove_path(path)#

Try very hard to delete a directory or file.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

path (str | Path) –

Return type:

None

remove_replaced_dir()#

Remove the directory containing replaced workflow details.

Return type:

None

rename_path(replaced, original)#

Revert the replaced workflow path to its original name.

This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.

Parameters:
Return type:

None

save()#

Commit pending changes to disk, if not in batch-update mode.

Return type:

None

set_EAR_end(EAR_ID, exit_code, success, snapshot, save=True)#

Mark an element action run as finished.

Parameters:
  • EAR_ID (int) –

  • exit_code (int) –

  • success (bool) –

  • snapshot (bool) –

  • save (bool) –

Return type:

datetime

set_EAR_skip(skip_reasons, save=True)#

Mark element action runs as skipped for the specified reasons.

Parameters:
Return type:

None

set_EAR_start(EAR_ID, run_dir, port_number, save=True)#

Mark an element action run as started.

Parameters:
  • EAR_ID (int) –

  • run_dir (Path | None) –

  • port_number (int | None) –

  • save (bool) –

Return type:

datetime

set_EARs_initialised(iter_ID, save=True)#

Mark an element action run as initialised.

Parameters:
  • iter_ID (int) –

  • save (bool) –

Return type:

None

set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#

Set details of a file, including whether it is associated with a parameter.

Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • param_id (int | None) –

  • path (Path | str) –

  • contents (str | None) –

  • filename (str | None) –

  • clean_up (bool) –

  • save (bool) –

set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, shell_idx=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#

Set the metadata for a job script.

Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • version_info (VersionInfo | None) –

  • submit_time (str | None) –

  • submit_hostname (str | None) –

  • submit_machine (str | None) –

  • shell_idx (int | None) –

  • submit_cmdline (list[str] | None) –

  • os_name (str | None) –

  • shell_name (str | None) –

  • scheduler_name (str | None) –

  • scheduler_job_ID (str | None) –

  • process_ID (int | None) –

  • save (bool) –

set_multi_run_ends(run_ids, run_dirs, exit_codes, successes, save=True)#
Parameters:
Return type:

datetime

set_multi_run_starts(run_ids, run_dirs, port_number, save=True)#
Parameters:
  • run_ids (list[int]) –

  • run_dirs (list[Path | None]) –

  • port_number (int) –

  • save (bool) –

Return type:

datetime

set_parameter_value(param_id, value, is_file=False, save=True)#

Set the value of a parameter.

Parameters:
  • param_id (int) –

  • value (Any) –

  • is_file (bool) –

  • save (bool) –

set_parameter_values(values, save=True)#

Set multiple non-file parameter values by parameter IDs.

Parameters:
set_run_dirs(run_dir_indices, run_idx, save=True)#
Parameters:
set_run_submission_data(EAR_ID, cmds_ID, sub_idx, save=True)#

Set the run submission data, like the submission index for an element action run.

Parameters:
  • EAR_ID (int) –

  • cmds_ID (int | None) –

  • sub_idx (int) –

  • save (bool) –

Return type:

None

property task_cache: dict[int, AnySTask]#

Cache for persistent tasks.

property ts_fmt: str#

The format for timestamps.

unzip(path='.', log=None)#

Convert this store into expanded form.

Parameters:
  • path (str) –

  • log (str | None) –

update_at_submit_metadata(sub_idx, submission_parts, save=True)#

Update metadata that is set at submit-time.

Parameters:
update_iter_data_indices(data_indices)#

Update data indices of one or more iterations.

Parameters:

data_indices (dict[int, DataIndex]) –

update_loop_num_iters(index, num_added_iters, save=True)#

Add iterations to a loop.

Parameters:
Return type:

None

update_loop_parents(index, num_added_iters, parents, save=True)#

Set the parents of a loop.

Parameters:
  • index (int) –

  • num_added_iters (Mapping[tuple[int, ...], int]) –

  • parents (Sequence[str]) –

  • save (bool) –

Return type:

None

update_param_source(param_sources, save=True)#

Set the source of a parameter.

Parameters:
Return type:

None

update_run_data_indices(data_indices)#

Update data indices of one or more runs.

Parameters:

data_indices (dict[int, DataIndex]) –

property use_cache: bool#

Whether to use a cache.

using_resource(res_label, action)#

Context manager for managing StoreResource objects associated with the store.

Parameters:
  • res_label (Literal['metadata', 'submissions', 'parameters', 'attrs', 'runs']) –

  • action (str) –

Return type:

Iterator[Any]

property workflow: Workflow#

The workflow this relates to.

classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, ts_fmt, ts_name_fmt, creation_info, compressor='blosc', compressor_kwargs=None)#

Write an empty persistent workflow.

Parameters:
Return type:

None

property zarr_store: Store#

The underlying store object.

zip(path='.', log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#

Convert the persistent store to zipped form.

Parameters:
  • path (str) – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.

  • log (str | None) –

  • overwrite (bool) –

  • include_execute (bool) –

  • include_rechunk_backups (bool) –