hpcflow.sdk.persistence.zarr.ZarrZipPersistentStore#

class hpcflow.sdk.persistence.zarr.ZarrZipPersistentStore(app, workflow, path, fs)#

Bases: ZarrPersistentStore

A store designed mainly as an archive format that can be uploaded to data repositories such as Zenodo.

Methods

add_EAR

Add a new EAR to an element iteration.

add_element

Add a new element to a task.

add_element_iteration

Add a new iteration to an element.

add_element_set

add_file

add_loop

Add a new loop to the workflow.

add_set_parameter

add_submission

Add a new submission.

add_submission_part

add_task

Add a new task to the workflow.

add_template_components

add_unset_parameter

cache_ctx

Context manager for using the persistent element/iteration/run cache.

cached_load

Context manager to cache the root attributes.

check_parameters_exist

For each parameter ID, return True if it exists, else False

copy

Copy the workflow store.

delete

Delete the persistent workflow.

delete_no_confirm

Permanently delete the workflow data with no confirmation.

get_EAR_skipped

get_EARs

get_creation_info

get_element_iterations

get_elements

get_loops

Retrieve all loops, including pending.

get_loops_by_IDs

Retrieve loops by index (ID), including pending.

get_name

get_parameter_set_statuses

get_parameter_sources

get_parameters

param kwargs:

get_submissions

Retrieve all submissions, including pending.

get_submissions_by_ID

get_task

get_task_elements

Get element data by an indices within a given task.

get_tasks

Retrieve all tasks, including pending.

get_tasks_by_IDs

get_template

get_template_components

Get all template components, including pending.

get_ts_fmt

get_ts_name_fmt

make_test_store_from_spec

Generate an store for testing purposes.

prepare_test_store_from_spec

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

rechunk_parameter_base

rechunk_runs

reinstate_replaced_dir

remove_path

Try very hard to delete a directory or file.

remove_replaced_dir

rename_path

Revert the replaced workflow path to its original name.

save

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end

set_EAR_skip

set_EAR_start

set_EAR_submission_index

set_EARs_initialised

set_file

set_jobscript_metadata

set_parameter_value

unzip

param path:

Path at which to create the new unzipped workflow. If this is an existing

update_loop_num_iters

update_loop_parents

update_param_source

using_resource

Context manager for managing StoreResource objects associated with the store.

write_empty_workflow

zip

param path:

Path at which to create the new zipped workflow. If this is an existing

Attributes

EAR_cache

Cache for persistent EARs.

element_cache

Cache for persistent elements.

element_iter_cache

Cache for persistent element iterations.

has_pending

is_submittable

Does this store support workflow submission?

logger

num_EARs_cache

Cache for total number of persistent EARs.

num_tasks_cache

Cache for number of persistent tasks.

param_sources_cache

Cache for persistent parameter sources.

parameter_cache

Cache for persistent parameters.

task_cache

Cache for persistent tasks.

ts_fmt

use_cache

zarr_store

property EAR_cache#

Cache for persistent EARs.

add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata, save=True)#

Add a new EAR to an element iteration.

Parameters:
  • elem_iter_ID (int) –

  • action_idx (int) –

  • commands_idx (List[int]) –

  • data_idx (Dict) –

  • metadata (Dict) –

  • save (bool) –

Return type:

int

add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#

Add a new element to a task.

Parameters:
  • task_ID (int) –

  • es_idx (int) –

  • seq_idx (Dict) –

  • src_idx (Dict) –

  • save (bool) –

add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#

Add a new iteration to an element.

Parameters:
  • element_ID (int) –

  • data_idx (Dict) –

  • schema_parameters (List[str]) –

  • loop_idx (Dict | None) –

  • save (bool) –

Return type:

int

add_element_set(task_id, es_js, save=True)#
Parameters:
add_file(store_contents, is_input, source, path=None, contents=None, filename=None, save=True)#
Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • source (Dict) –

  • contents (str) –

  • filename (str) –

  • save (bool) –

add_loop(loop_template, iterable_parameters, parents, num_added_iterations, iter_IDs, save=True)#

Add a new loop to the workflow.

Parameters:
add_set_parameter(data, source, save=True)#
Parameters:
Return type:

int

add_submission(sub_idx, sub_js, save=True)#

Add a new submission.

Parameters:
add_submission_part(sub_idx, dt_str, submitted_js_idx, save=True)#
Parameters:
  • sub_idx (int) –

  • dt_str (str) –

  • submitted_js_idx (List[int]) –

  • save (bool) –

add_task(idx, task_template, save=True)#

Add a new task to the workflow.

Parameters:
  • idx (int) –

  • task_template (Dict) –

  • save (bool) –

add_template_components(temp_comps, save=True)#
Parameters:
Return type:

None

add_unset_parameter(source, save=True)#
Parameters:
Return type:

int

cache_ctx()#

Context manager for using the persistent element/iteration/run cache.

cached_load()#

Context manager to cache the root attributes.

Return type:

Iterator[Dict]

check_parameters_exist(id_lst)#

For each parameter ID, return True if it exists, else False

Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

copy(path=None)#

Copy the workflow store.

This does not work on remote filesystems.

Return type:

str

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

property element_cache#

Cache for persistent elements.

property element_iter_cache#

Cache for persistent element iterations.

get_EAR_skipped(EAR_ID)#
Parameters:

EAR_ID (int) –

Return type:

bool

get_EARs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySEAR]

get_creation_info()#
get_element_iterations(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElementIter]

get_elements(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElement]

get_loops()#

Retrieve all loops, including pending.

Return type:

Dict[int, Dict]

get_loops_by_IDs(id_lst)#

Retrieve loops by index (ID), including pending.

Parameters:

id_lst (Iterable[int]) –

Return type:

Dict[int, Dict]

get_name()#
get_parameter_set_statuses(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

get_parameter_sources(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[Dict]

get_parameters(id_lst, **kwargs)#
Parameters:
  • kwargs (Dict) –

    dataset_copybool

    For Zarr stores only. If True, copy arrays as NumPy arrays.

  • id_lst (Iterable[int]) –

Return type:

List[AnySParameter]

get_submissions()#

Retrieve all submissions, including pending.

Return type:

Dict[int, Dict]

get_submissions_by_ID(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

Dict[int, Dict]

get_task(task_idx)#
Parameters:

task_idx (int) –

Return type:

AnySTask

get_task_elements(task_id, idx_lst=None)#

Get element data by an indices within a given task.

Element iterations and EARs belonging to the elements are included.

Parameters:

idx_lst (Iterable[int] | None) –

Return type:

List[Dict]

get_tasks()#

Retrieve all tasks, including pending.

Return type:

List[AnySTask]

get_tasks_by_IDs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySTask]

get_template()#
Return type:

Dict

get_template_components()#

Get all template components, including pending.

Return type:

Dict

get_ts_fmt()#
get_ts_name_fmt()#
property has_pending#
property is_submittable#

Does this store support workflow submission?

property logger#
classmethod make_test_store_from_spec(spec, dir=None, path='test_store', overwrite=False)#

Generate an store for testing purposes.

property num_EARs_cache#

Cache for total number of persistent EARs.

property num_tasks_cache#

Cache for number of persistent tasks.

property param_sources_cache#

Cache for persistent parameter sources.

property parameter_cache#

Cache for persistent parameters.

static prepare_test_store_from_spec(task_spec)#

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
Parameters:
  • chunk_size (int | None) –

  • backup (bool | None) –

  • status (bool | None) –

rechunk_runs(chunk_size=None, backup=True, status=True)#
Parameters:
  • chunk_size (int | None) –

  • backup (bool | None) –

  • status (bool | None) –

reinstate_replaced_dir()#
Return type:

None

remove_path(path, fs)#

Try very hard to delete a directory or file.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

path (str) –

Return type:

None

remove_replaced_dir()#
Return type:

None

rename_path(replaced, original, fs)#

Revert the replaced workflow path to its original name.

This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.

Parameters:
  • replaced (str) –

  • original (str) –

Return type:

None

save()#

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end(EAR_ID, exit_code, success, save=True)#
Parameters:
  • EAR_ID (int) –

  • exit_code (int) –

  • success (bool) –

  • save (bool) –

Return type:

datetime

set_EAR_skip(EAR_ID, save=True)#
Parameters:
Return type:

None

set_EAR_start(EAR_ID, save=True)#
Parameters:
Return type:

datetime

set_EAR_submission_index(EAR_ID, sub_idx, save=True)#
Parameters:
  • EAR_ID (int) –

  • sub_idx (int) –

  • save (bool) –

Return type:

None

set_EARs_initialised(iter_ID, save=True)#
Parameters:
  • iter_ID (int) –

  • save (bool) –

Return type:

None

set_file(store_contents, is_input, param_id=None, path=None, contents=None, filename=None, clean_up=False, save=True)#
Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • param_id (int) –

  • contents (str) –

  • filename (str) –

  • clean_up (bool) –

  • save (bool) –

set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • version_info (Dict | None) –

  • submit_time (str | None) –

  • submit_hostname (str | None) –

  • submit_machine (str | None) –

  • submit_cmdline (List[str] | None) –

  • os_name (str | None) –

  • shell_name (str | None) –

  • scheduler_name (str | None) –

  • scheduler_job_ID (str | None) –

  • process_ID (int | None) –

  • save (bool) –

set_parameter_value(param_id, value, is_file=False, save=True)#
Parameters:
  • param_id (int) –

  • value (Any) –

  • is_file (bool) –

  • save (bool) –

property task_cache#

Cache for persistent tasks.

property ts_fmt: str#
unzip(path='.', log=None)#
Parameters:

path – Path at which to create the new unzipped workflow. If this is an existing directory, the new workflow directory will be created within this directory. Otherwise, this path will represent the new workflow directory path.

update_loop_num_iters(index, num_added_iters, save=True)#
Parameters:
  • index (int) –

  • num_added_iters (int) –

  • save (bool) –

Return type:

None

update_loop_parents(index, num_added_iters, parents, save=True)#
Parameters:
Return type:

None

update_param_source(param_sources, save=True)#
Parameters:
Return type:

None

property use_cache#
using_resource(res_label, action)#

Context manager for managing StoreResource objects associated with the store.

classmethod write_empty_workflow(app, template_js, template_components_js, wk_path, fs, name, replaced_wk, ts_fmt, ts_name_fmt, creation_info, compressor='blosc', compressor_kwargs=None)#
Parameters:
  • template_js (Dict) –

  • template_components_js (Dict) –

  • wk_path (str) –

  • name (str) –

  • replaced_wk (str) –

  • ts_fmt (str) –

  • ts_name_fmt (str) –

  • creation_info (Dict) –

  • compressor (str | None) –

  • compressor_kwargs (Dict[str, Any] | None) –

Return type:

None

property zarr_store: Store#
zip()#
Parameters:

path – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.