hpcflow.sdk.persistence.zarr.ZarrPersistentStore#

class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(app, workflow, path, fs)#

Bases: PersistentStore[ZarrStoreTask, ZarrStoreElement, ZarrStoreElementIter, ZarrStoreEAR, ZarrStoreParameter]

A persistent store implemented using Zarr.

Methods

add_EAR

Add a new EAR to an element iteration.

add_element

Add a new element to a task.

add_element_iteration

Add a new iteration to an element.

add_element_set

Add an element set to a task.

add_file

Add a file that will be associated with a parameter.

add_loop

Add a new loop to the workflow.

add_set_parameter

Add a parameter that is set to a value.

add_submission

Add a new submission.

add_submission_part

Add a submission part.

add_task

Add a new task to the workflow.

add_template_components

Add template components to the workflow.

add_unset_parameter

Add a parameter that is not set to any value.

cache_ctx

Context manager for using the persistent element/iteration/run cache.

cached_load

Context manager to cache the root attributes.

check_parameters_exist

For each parameter ID, return True if it exists, else False.

copy

Copy the workflow store.

delete

Delete the persistent workflow.

delete_no_confirm

Permanently delete the workflow data with no confirmation.

get_EAR_skipped

Whether the element action run with the given ID was skipped.

get_EARs

Get element action runs with the given IDs.

get_creation_info

Get information about the creation of the workflow.

get_element_iterations

Get element iterations with the given IDs.

get_elements

Get elements with the given IDs.

get_loops

Retrieve all loops, including pending.

get_loops_by_IDs

Retrieve loops by index (ID), including pending.

get_name

Get the name of the workflow.

get_parameter_set_statuses

Get whether the parameters with the given IDs are set.

get_parameter_sources

Get the sources of the parameters with the given IDs.

get_parameters

Get parameters with the given IDs.

get_submissions

Retrieve all submissions, including pending.

get_submissions_by_ID

Get submissions with the given IDs.

get_task

Get a task.

get_task_elements

Get element data by an indices within a given task.

get_tasks

Retrieve all tasks, including pending.

get_tasks_by_IDs

Get tasks with the given IDs.

get_template

Get the workflow template.

get_template_components

Get all template components, including pending.

get_ts_fmt

Get the format for timestamps.

get_ts_name_fmt

Get the format for timestamps to use in names.

make_test_store_from_spec

Generate an store for testing purposes.

prepare_test_store_from_spec

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

rechunk_parameter_base

Rechunk the parameter data to be stored more efficiently.

rechunk_runs

Rechunk the run data to be stored more efficiently.

reinstate_replaced_dir

Reinstate the directory containing replaced workflow details.

remove_path

Try very hard to delete a directory or file.

remove_replaced_dir

Remove the directory containing replaced workflow details.

rename_path

Revert the replaced workflow path to its original name.

save

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end

Mark an element action run as finished.

set_EAR_skip

Mark an element action run as skipped.

set_EAR_start

Mark an element action run as started.

set_EAR_submission_index

Set the submission index for an element action run.

set_EARs_initialised

Mark an element action run as initialised.

set_file

Set details of a file, including whether it is associated with a parameter.

set_jobscript_metadata

Set the metadata for a job script.

set_parameter_value

Set the value of a parameter.

unzip

Convert this store into expanded form.

update_loop_num_iters

Add iterations to a loop.

update_loop_parents

Set the parents of a loop.

update_param_source

Set the source of a parameter.

using_resource

Context manager for managing StoreResource objects associated with the store.

write_empty_workflow

Write an empty persistent workflow.

zip

Convert the persistent store to zipped form.

Attributes

EAR_cache

Cache for persistent EARs.

element_cache

Cache for persistent elements.

element_iter_cache

Cache for persistent element iterations.

has_pending

Whether there are any pending changes.

is_submittable

Does this store support workflow submission?

logger

The logger to use.

num_EARs_cache

Cache for total number of persistent EARs.

num_tasks_cache

Cache for number of persistent tasks.

param_sources_cache

Cache for persistent parameter sources.

parameter_cache

Cache for persistent parameters.

task_cache

Cache for persistent tasks.

ts_fmt

The format for timestamps.

use_cache

Whether to use a cache.

workflow

The workflow this relates to.

zarr_store

The underlying store object.

Parameters:
property EAR_cache: dict[int, AnySEAR]#

Cache for persistent EARs.

add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#

Add a new EAR to an element iteration.

Parameters:
  • elem_iter_ID (int) –

  • action_idx (int) –

  • commands_idx (list[int]) –

  • data_idx (DataIndex) –

  • metadata (Metadata | None) –

  • save (bool) –

Return type:

int

add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#

Add a new element to a task.

Parameters:
Return type:

int

add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#

Add a new iteration to an element.

Parameters:
  • element_ID (int) –

  • data_idx (DataIndex) –

  • schema_parameters (list[str]) –

  • loop_idx (Mapping[str, int] | None) –

  • save (bool) –

Return type:

int

add_element_set(task_id, es_js, save=True)#

Add an element set to a task.

Parameters:
  • task_id (int) –

  • es_js (Mapping) –

  • save (bool) –

add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#

Add a file that will be associated with a parameter.

Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • source (ParamSource) –

  • path (Path | str) –

  • contents (str | None) –

  • filename (str | None) –

  • save (bool) –

add_loop(loop_template, iterable_parameters, parents, num_added_iterations, iter_IDs, save=True)#

Add a new loop to the workflow.

Parameters:
  • loop_template (Mapping[str, Any]) –

  • parents (Sequence[str]) –

  • num_added_iterations (Mapping[tuple[int, ...], int]) –

  • iter_IDs (Iterable[int]) –

  • save (bool) –

add_set_parameter(data, source, save=True)#

Add a parameter that is set to a value.

Parameters:
Return type:

int

add_submission(sub_idx, sub_js, save=True)#

Add a new submission.

Parameters:
  • sub_idx (int) –

  • sub_js (JSONDocument) –

  • save (bool) –

add_submission_part(sub_idx, dt_str, submitted_js_idx, save=True)#

Add a submission part.

Parameters:
  • sub_idx (int) –

  • dt_str (str) –

  • submitted_js_idx (list[int]) –

  • save (bool) –

add_task(idx, task_template, save=True)#

Add a new task to the workflow.

Parameters:
  • idx (int) –

  • task_template (Mapping) –

  • save (bool) –

add_template_components(temp_comps, save=True)#

Add template components to the workflow.

Parameters:
Return type:

None

add_unset_parameter(source, save=True)#

Add a parameter that is not set to any value.

Parameters:
Return type:

int

cache_ctx()#

Context manager for using the persistent element/iteration/run cache.

Return type:

Iterator[None]

cached_load()#

Context manager to cache the root attributes.

Return type:

Iterator[None]

check_parameters_exist(ids)#

For each parameter ID, return True if it exists, else False.

Parameters:

ids (Sequence[int]) –

Return type:

Iterator[bool]

copy(path=None)#

Copy the workflow store.

This does not work on remote filesystems.

Parameters:

path (PathLike) –

Return type:

Path

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

property element_cache: dict[int, AnySElement]#

Cache for persistent elements.

property element_iter_cache: dict[int, AnySElementIter]#

Cache for persistent element iterations.

get_EAR_skipped(EAR_ID)#

Whether the element action run with the given ID was skipped.

Parameters:

EAR_ID (int) –

Return type:

bool

get_EARs(ids)#

Get element action runs with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySEAR]

get_creation_info()#

Get information about the creation of the workflow.

get_element_iterations(ids)#

Get element iterations with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySElementIter]

get_elements(ids)#

Get elements with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySElement]

get_loops()#

Retrieve all loops, including pending.

Return type:

dict[int, LoopDescriptor]

get_loops_by_IDs(ids)#

Retrieve loops by index (ID), including pending.

Parameters:

ids (Iterable[int]) –

Return type:

dict[int, LoopDescriptor]

get_name()#

Get the name of the workflow.

get_parameter_set_statuses(ids)#

Get whether the parameters with the given IDs are set.

Parameters:

ids (Iterable[int]) –

Return type:

list[bool]

get_parameter_sources(ids)#

Get the sources of the parameters with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

list[ParamSource]

get_parameters(ids, **kwargs)#

Get parameters with the given IDs.

Parameters:

ids (Iterable[int]) – The IDs of the parameters to get.

Keyword Arguments:

dataset_copy (bool) – For Zarr stores only. If True, copy arrays as NumPy arrays.

Return type:

list[AnySParameter]

get_submissions()#

Retrieve all submissions, including pending.

Return type:

dict[int, JSONDocument]

get_submissions_by_ID(ids)#

Get submissions with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

dict[int, JSONDocument]

get_task(task_idx)#

Get a task.

Parameters:

task_idx (int) –

Return type:

AnySTask

get_task_elements(task_id, idx_lst=None)#

Get element data by an indices within a given task.

Element iterations and EARs belonging to the elements are included.

Parameters:
  • task_id (int) –

  • idx_lst (Iterable[int] | None) –

Return type:

Iterator[Mapping[str, Any]]

get_tasks()#

Retrieve all tasks, including pending.

Return type:

list[~AnySTask]

get_tasks_by_IDs(ids)#

Get tasks with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySTask]

get_template()#

Get the workflow template.

Return type:

dict[str, JSONed]

get_template_components()#

Get all template components, including pending.

Return type:

dict[str, Any]

get_ts_fmt()#

Get the format for timestamps.

get_ts_name_fmt()#

Get the format for timestamps to use in names.

property has_pending: bool#

Whether there are any pending changes.

property is_submittable: bool#

Does this store support workflow submission?

property logger: Logger#

The logger to use.

classmethod make_test_store_from_spec(spec, dir=None, path='test_store', overwrite=False)#

Generate an store for testing purposes.

property num_EARs_cache: int | None#

Cache for total number of persistent EARs.

property num_tasks_cache: int | None#

Cache for number of persistent tasks.

property param_sources_cache: dict[int, ParamSource]#

Cache for persistent parameter sources.

property parameter_cache: dict[int, AnySParameter]#

Cache for persistent parameters.

static prepare_test_store_from_spec(task_spec)#

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

Parameters:

task_spec (Sequence[Mapping[str, Sequence[Mapping[str, Sequence[Mapping[str, Sequence]]]]]]) –

Return type:

tuple[list[dict], list[dict], list[dict], list[dict]]

rechunk_parameter_base(chunk_size=None, backup=True, status=True)#

Rechunk the parameter data to be stored more efficiently.

Parameters:
  • chunk_size (int | None) –

  • backup (bool) –

  • status (bool) –

Return type:

Array

rechunk_runs(chunk_size=None, backup=True, status=True)#

Rechunk the run data to be stored more efficiently.

Parameters:
  • chunk_size (int | None) –

  • backup (bool) –

  • status (bool) –

Return type:

Array

reinstate_replaced_dir()#

Reinstate the directory containing replaced workflow details.

Return type:

None

remove_path(path)#

Try very hard to delete a directory or file.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

path (str | Path) –

Return type:

None

remove_replaced_dir()#

Remove the directory containing replaced workflow details.

Return type:

None

rename_path(replaced, original)#

Revert the replaced workflow path to its original name.

This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.

Parameters:
Return type:

None

save()#

Commit pending changes to disk, if not in batch-update mode.

Return type:

None

set_EAR_end(EAR_ID, exit_code, success, save=True)#

Mark an element action run as finished.

Parameters:
  • EAR_ID (int) –

  • exit_code (int) –

  • success (bool) –

  • save (bool) –

Return type:

datetime

set_EAR_skip(EAR_ID, save=True)#

Mark an element action run as skipped.

Parameters:
Return type:

None

set_EAR_start(EAR_ID, save=True)#

Mark an element action run as started.

Parameters:
Return type:

datetime

set_EAR_submission_index(EAR_ID, sub_idx, save=True)#

Set the submission index for an element action run.

Parameters:
  • EAR_ID (int) –

  • sub_idx (int) –

  • save (bool) –

Return type:

None

set_EARs_initialised(iter_ID, save=True)#

Mark an element action run as initialised.

Parameters:
  • iter_ID (int) –

  • save (bool) –

Return type:

None

set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#

Set details of a file, including whether it is associated with a parameter.

Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • param_id (int | None) –

  • path (Path | str) –

  • contents (str | None) –

  • filename (str | None) –

  • clean_up (bool) –

  • save (bool) –

set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#

Set the metadata for a job script.

Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • version_info (VersionInfo | None) –

  • submit_time (str | None) –

  • submit_hostname (str | None) –

  • submit_machine (str | None) –

  • submit_cmdline (list[str] | None) –

  • os_name (str | None) –

  • shell_name (str | None) –

  • scheduler_name (str | None) –

  • scheduler_job_ID (str | None) –

  • process_ID (int | None) –

  • save (bool) –

set_parameter_value(param_id, value, is_file=False, save=True)#

Set the value of a parameter.

Parameters:
  • param_id (int) –

  • value (Any) –

  • is_file (bool) –

  • save (bool) –

property task_cache: dict[int, AnySTask]#

Cache for persistent tasks.

property ts_fmt: str#

The format for timestamps.

unzip(path='.', log=None)#

Convert this store into expanded form.

Parameters:
  • path (str) –

  • log (str | None) –

update_loop_num_iters(index, num_added_iters, save=True)#

Add iterations to a loop.

Parameters:
Return type:

None

update_loop_parents(index, num_added_iters, parents, save=True)#

Set the parents of a loop.

Parameters:
  • index (int) –

  • num_added_iters (Mapping[tuple[int, ...], int]) –

  • parents (Sequence[str]) –

  • save (bool) –

Return type:

None

update_param_source(param_sources, save=True)#

Set the source of a parameter.

Parameters:
Return type:

None

property use_cache: bool#

Whether to use a cache.

using_resource(res_label, action)#

Context manager for managing StoreResource objects associated with the store.

Parameters:
  • res_label (str) –

  • action (str) –

Return type:

Iterator[Any]

property workflow: Workflow#

The workflow this relates to.

classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, ts_fmt, ts_name_fmt, creation_info, compressor='blosc', compressor_kwargs=None)#

Write an empty persistent workflow.

Parameters:
Return type:

None

property zarr_store: Store#

The underlying store object.

zip(path='.', log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#

Convert the persistent store to zipped form.

Parameters:
  • path (str) – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.

  • log (str | None) –

  • overwrite (bool) –

  • include_execute (bool) –

  • include_rechunk_backups (bool) –