hpcflow.sdk.persistence.zarr.ZarrPersistentStore#

class hpcflow.sdk.persistence.zarr.ZarrPersistentStore(app, workflow, path, fs)#

Bases: PersistentStore

Methods

add_EAR

Add a new EAR to an element iteration.

add_element

Add a new element to a task.

add_element_iteration

Add a new iteration to an element.

add_element_set

add_file

add_loop

Add a new loop to the workflow.

add_set_parameter

add_submission

Add a new submission.

add_submission_attempt

add_task

Add a new task to the workflow.

add_template_components

add_unset_parameter

cached_load

Context manager to cache the root attributes.

check_parameters_exist

For each parameter ID, return True if it exists, else False

copy

Copy the workflow store.

delete

Delete the persistent workflow.

delete_no_confirm

Permanently delete the workflow data with no confirmation.

get_EAR_skipped

get_EARs

get_creation_info

get_element_iterations

get_elements

get_fs_path

get_loops

Retrieve all loops, including pending.

get_loops_by_IDs

Retrieve loops by index (ID), including pending.

get_parameter_set_statuses

get_parameter_sources

get_parameters

param kwargs:

get_submissions

Retrieve all submissions, including pending.

get_submissions_by_ID

get_task

get_task_elements

Get element data by an index slice within a given task.

get_tasks

Retrieve all tasks, including pending.

get_tasks_by_IDs

get_template

get_template_components

Get all template components, including pending.

make_test_store_from_spec

Generate an store for testing purposes.

prepare_test_store_from_spec

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

reinstate_replaced_dir

remove_path

Try very hard to delete a directory or file.

remove_replaced_dir

rename_path

Revert the replaced workflow path to its original name.

save

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end

set_EAR_skip

set_EAR_start

set_EAR_submission_index

set_file

set_jobscript_job_ID

set_jobscript_submit_time

set_jobscript_version_info

set_parameter_value

to_zip

update_loop_num_iters

update_param_source

using_resource

Context manager for managing StoreResource objects associated with the store.

write_empty_workflow

Attributes

has_pending

logger

ts_fmt

zarr_store

add_EAR(elem_iter_ID, action_idx, data_idx, metadata, save=True)#

Add a new EAR to an element iteration.

Parameters:
  • elem_iter_ID (int) –

  • action_idx (int) –

  • data_idx (Dict) –

  • metadata (Dict) –

  • save (bool) –

Return type:

int

add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#

Add a new element to a task.

Parameters:
  • task_ID (int) –

  • es_idx (int) –

  • seq_idx (Dict) –

  • src_idx (Dict) –

  • save (bool) –

add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#

Add a new iteration to an element.

Parameters:
  • element_ID (int) –

  • data_idx (Dict) –

  • schema_parameters (List[str]) –

  • loop_idx (Dict | None) –

  • save (bool) –

Return type:

int

add_element_set(task_id, es_js, save=True)#
Parameters:
add_file(store_contents, is_input, source, path=None, contents=None, filename=None, save=True)#
Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • source (Dict) –

  • contents (str) –

  • filename (str) –

  • save (bool) –

add_loop(loop_template, iterable_parameters, iter_IDs, save=True)#

Add a new loop to the workflow.

Parameters:
add_set_parameter(data, source, save=True)#
Parameters:
Return type:

int

add_submission(sub_idx, sub_js, save=True)#

Add a new submission.

Parameters:
add_submission_attempt(sub_idx, submitted_js_idx, save=True)#
Parameters:
add_task(idx, task_template, save=True)#

Add a new task to the workflow.

Parameters:
  • idx (int) –

  • task_template (Dict) –

  • save (bool) –

add_template_components(temp_comps, save=True)#
Parameters:
Return type:

None

add_unset_parameter(source, save=True)#
Parameters:
Return type:

int

cached_load()#

Context manager to cache the root attributes.

Return type:

Iterator[Dict]

check_parameters_exist(id_lst)#

For each parameter ID, return True if it exists, else False

Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

copy(path=None)#

Copy the workflow store.

This does not work on remote filesystems.

Return type:

str

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

get_EAR_skipped(EAR_ID)#
Parameters:

EAR_ID (int) –

Return type:

bool

get_EARs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySEAR]

get_creation_info()#
get_element_iterations(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElementIter]

get_elements(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElement]

get_fs_path()#
get_loops()#

Retrieve all loops, including pending.

Return type:

Dict[int, Dict]

get_loops_by_IDs(id_lst)#

Retrieve loops by index (ID), including pending.

Parameters:

id_lst (Iterable[int]) –

Return type:

Dict[int, Dict]

get_parameter_set_statuses(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

get_parameter_sources(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[Dict]

get_parameters(id_lst, **kwargs)#
Parameters:
  • kwargs (Dict) –

    dataset_copybool

    For Zarr stores only. If True, copy arrays as NumPy arrays.

  • id_lst (Iterable[int]) –

Return type:

List[AnySParameter]

get_submissions()#

Retrieve all submissions, including pending.

Return type:

Dict[int, Dict]

get_submissions_by_ID(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

Dict[int, Dict]

get_task(task_idx)#
Parameters:

task_idx (int) –

Return type:

AnySTask

get_task_elements(task_id, idx_sel)#

Get element data by an index slice within a given task.

Element iterations and EARs belonging to the elements are included.

Parameters:

idx_sel (slice) –

Return type:

List[Dict]

get_tasks()#

Retrieve all tasks, including pending.

Return type:

List[AnySTask]

get_tasks_by_IDs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySTask]

get_template()#
Return type:

Dict

get_template_components()#

Get all template components, including pending.

Return type:

Dict

property has_pending#
property logger#
classmethod make_test_store_from_spec(spec, dir=None, path='test_store', overwrite=False)#

Generate an store for testing purposes.

static prepare_test_store_from_spec(task_spec)#

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

reinstate_replaced_dir()#
Return type:

None

remove_path(path, fs)#

Try very hard to delete a directory or file.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

path (str) –

Return type:

None

remove_replaced_dir()#
Return type:

None

rename_path(replaced, original, fs)#

Revert the replaced workflow path to its original name.

This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.

Parameters:
  • replaced (str) –

  • original (str) –

Return type:

None

save()#

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end(EAR_ID, exit_code, success, save=True)#
Parameters:
  • EAR_ID (int) –

  • exit_code (int) –

  • success (bool) –

  • save (bool) –

Return type:

datetime

set_EAR_skip(EAR_ID, save=True)#
Parameters:
Return type:

None

set_EAR_start(EAR_ID, save=True)#
Parameters:
Return type:

datetime

set_EAR_submission_index(EAR_ID, sub_idx, save=True)#
Parameters:
  • EAR_ID (int) –

  • sub_idx (int) –

  • save (bool) –

Return type:

None

set_file(store_contents, is_input, param_id=None, path=None, contents=None, filename=None, clean_up=False, save=True)#
Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • param_id (int) –

  • contents (str) –

  • filename (str) –

  • clean_up (bool) –

  • save (bool) –

set_jobscript_job_ID(sub_idx, js_idx, job_ID, save=True)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • job_ID (str) –

  • save (bool) –

set_jobscript_submit_time(sub_idx, js_idx, submit_time, save=True)#
Parameters:
set_jobscript_version_info(sub_idx, js_idx, vers_info, save=True)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • vers_info (Dict) –

  • save (bool) –

set_parameter_value(param_id, value, is_file=False, save=True)#
Parameters:
  • param_id (int) –

  • value (Any) –

  • is_file (bool) –

  • save (bool) –

to_zip()#
property ts_fmt: str#
update_loop_num_iters(index, num_iters, save=True)#
Parameters:
  • index (int) –

  • num_iters (int) –

  • save (bool) –

Return type:

None

update_param_source(param_id, source, save=True)#
Parameters:
Return type:

None

using_resource(res_label, action)#

Context manager for managing StoreResource objects associated with the store.

classmethod write_empty_workflow(app, template_js, template_components_js, wk_path, fs, fs_path, replaced_wk, creation_info)#
Parameters:
  • template_js (Dict) –

  • template_components_js (Dict) –

  • wk_path (str) –

  • fs_path (str) –

  • replaced_wk (str) –

  • creation_info (Dict) –

Return type:

None

property zarr_store: Store#