hpcflow.sdk.persistence.base.PersistentStore#

class hpcflow.sdk.persistence.base.PersistentStore(app, workflow, path, fs=None)#

Bases: ABC, Generic[AnySTask, AnySElement, AnySElementIter, AnySEAR, AnySParameter]

An abstract class representing a persistent workflow store.

Parameters:
  • app (App) – The main hpcflow core.

  • workflow (Workflow) – The workflow being persisted.

  • path (pathlib.Path) – Where to hold the store.

  • fs (fsspec.AbstractFileSystem) – Optionally, information about how to access the store.

Methods

add_EAR

Add a new EAR to an element iteration.

add_element

Add a new element to a task.

add_element_iteration

Add a new iteration to an element.

add_element_set

Add an element set to a task.

add_file

Add a file that will be associated with a parameter.

add_loop

Add a new loop to the workflow.

add_set_parameter

Add a parameter that is set to a value.

add_submission

Add a new submission.

add_submission_part

Add a submission part.

add_task

Add a new task to the workflow.

add_template_components

Add template components to the workflow.

add_unset_parameter

Add a parameter that is not set to any value.

cache_ctx

Context manager for using the persistent element/iteration/run cache.

cached_load

Perform a load with cache enabled while the with-wrapped code runs.

check_parameters_exist

For each parameter ID, return True if it exists, else False.

copy

Copy the workflow store.

delete

Delete the persistent workflow.

delete_no_confirm

Permanently delete the workflow data with no confirmation.

get_EAR_skipped

Whether the element action run with the given ID was skipped.

get_EARs

Get element action runs with the given IDs.

get_creation_info

Get the workflow creation data.

get_element_iterations

Get element iterations with the given IDs.

get_elements

Get elements with the given IDs.

get_loops

Retrieve all loops, including pending.

get_loops_by_IDs

Retrieve loops by index (ID), including pending.

get_name

Get the workflow name.

get_parameter_set_statuses

Get whether the parameters with the given IDs are set.

get_parameter_sources

Get the sources of the parameters with the given IDs.

get_parameters

Get parameters with the given IDs.

get_submissions

Retrieve all submissions, including pending.

get_submissions_by_ID

Get submissions with the given IDs.

get_task

Get a task.

get_task_elements

Get element data by an indices within a given task.

get_tasks

Retrieve all tasks, including pending.

get_tasks_by_IDs

Get tasks with the given IDs.

get_template

Get the workflow template.

get_template_components

Get all template components, including pending.

get_ts_fmt

Get the timestamp format.

get_ts_name_fmt

Get the timestamp format for names.

prepare_test_store_from_spec

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

rechunk_parameter_base

rechunk_runs

reinstate_replaced_dir

Reinstate a replaced directory.

remove_path

Try very hard to delete a directory or file.

remove_replaced_dir

Remove a replaced directory.

rename_path

Revert the replaced workflow path to its original name.

save

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end

Mark an element action run as finished.

set_EAR_skip

Mark an element action run as skipped.

set_EAR_start

Mark an element action run as started.

set_EAR_submission_index

Set the submission index for an element action run.

set_EARs_initialised

Mark an element action run as initialised.

set_file

Set details of a file, including whether it is associated with a parameter.

set_jobscript_metadata

Set the metadata for a job script.

set_parameter_value

Set the value of a parameter.

unzip

Convert this store into expanded form.

update_loop_num_iters

Add iterations to a loop.

update_loop_parents

Set the parents of a loop.

update_param_source

Set the source of a parameter.

using_resource

Context manager for managing StoreResource objects associated with the store.

write_empty_workflow

Write an empty workflow.

zip

Convert this store into archival form.

Attributes

EAR_cache

Cache for persistent EARs.

element_cache

Cache for persistent elements.

element_iter_cache

Cache for persistent element iterations.

has_pending

Whether there are any pending changes.

is_submittable

Does this store support workflow submission?

logger

The logger to use.

num_EARs_cache

Cache for total number of persistent EARs.

num_tasks_cache

Cache for number of persistent tasks.

param_sources_cache

Cache for persistent parameter sources.

parameter_cache

Cache for persistent parameters.

task_cache

Cache for persistent tasks.

ts_fmt

The format for timestamps.

use_cache

Whether to use a cache.

workflow

The workflow this relates to.

property EAR_cache: dict[int, AnySEAR]#

Cache for persistent EARs.

add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata=None, save=True)#

Add a new EAR to an element iteration.

Parameters:
  • elem_iter_ID (int) –

  • action_idx (int) –

  • commands_idx (list[int]) –

  • data_idx (DataIndex) –

  • metadata (Metadata | None) –

  • save (bool) –

Return type:

int

add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#

Add a new element to a task.

Parameters:
Return type:

int

add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#

Add a new iteration to an element.

Parameters:
  • element_ID (int) –

  • data_idx (DataIndex) –

  • schema_parameters (list[str]) –

  • loop_idx (Mapping[str, int] | None) –

  • save (bool) –

Return type:

int

add_element_set(task_id, es_js, save=True)#

Add an element set to a task.

Parameters:
  • task_id (int) –

  • es_js (Mapping) –

  • save (bool) –

add_file(store_contents, is_input, source, path, contents=None, filename=None, save=True)#

Add a file that will be associated with a parameter.

Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • source (ParamSource) –

  • path (Path | str) –

  • contents (str | None) –

  • filename (str | None) –

  • save (bool) –

add_loop(loop_template, iterable_parameters, parents, num_added_iterations, iter_IDs, save=True)#

Add a new loop to the workflow.

Parameters:
  • loop_template (Mapping[str, Any]) –

  • parents (Sequence[str]) –

  • num_added_iterations (Mapping[tuple[int, ...], int]) –

  • iter_IDs (Iterable[int]) –

  • save (bool) –

add_set_parameter(data, source, save=True)#

Add a parameter that is set to a value.

Parameters:
Return type:

int

add_submission(sub_idx, sub_js, save=True)#

Add a new submission.

Parameters:
  • sub_idx (int) –

  • sub_js (JSONDocument) –

  • save (bool) –

add_submission_part(sub_idx, dt_str, submitted_js_idx, save=True)#

Add a submission part.

Parameters:
  • sub_idx (int) –

  • dt_str (str) –

  • submitted_js_idx (list[int]) –

  • save (bool) –

add_task(idx, task_template, save=True)#

Add a new task to the workflow.

Parameters:
  • idx (int) –

  • task_template (Mapping) –

  • save (bool) –

add_template_components(temp_comps, save=True)#

Add template components to the workflow.

Parameters:
Return type:

None

add_unset_parameter(source, save=True)#

Add a parameter that is not set to any value.

Parameters:
Return type:

int

cache_ctx()#

Context manager for using the persistent element/iteration/run cache.

Return type:

Iterator[None]

abstract cached_load()#

Perform a load with cache enabled while the with-wrapped code runs.

Return type:

AbstractContextManager[None]

check_parameters_exist(ids)#

For each parameter ID, return True if it exists, else False.

Parameters:

ids (Sequence[int]) –

Return type:

Iterator[bool]

copy(path=None)#

Copy the workflow store.

This does not work on remote filesystems.

Parameters:

path (PathLike) –

Return type:

Path

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

property element_cache: dict[int, AnySElement]#

Cache for persistent elements.

property element_iter_cache: dict[int, AnySElementIter]#

Cache for persistent element iterations.

get_EAR_skipped(EAR_ID)#

Whether the element action run with the given ID was skipped.

Parameters:

EAR_ID (int) –

Return type:

bool

get_EARs(ids)#

Get element action runs with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySEAR]

abstract get_creation_info()#

Get the workflow creation data.

Return type:

StoreCreationInfo

get_element_iterations(ids)#

Get element iterations with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySElementIter]

get_elements(ids)#

Get elements with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySElement]

get_loops()#

Retrieve all loops, including pending.

Return type:

dict[int, LoopDescriptor]

get_loops_by_IDs(ids)#

Retrieve loops by index (ID), including pending.

Parameters:

ids (Iterable[int]) –

Return type:

dict[int, LoopDescriptor]

abstract get_name()#

Get the workflow name.

Return type:

str

get_parameter_set_statuses(ids)#

Get whether the parameters with the given IDs are set.

Parameters:

ids (Iterable[int]) –

Return type:

list[bool]

get_parameter_sources(ids)#

Get the sources of the parameters with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

list[ParamSource]

get_parameters(ids, **kwargs)#

Get parameters with the given IDs.

Parameters:

ids (Iterable[int]) – The IDs of the parameters to get.

Keyword Arguments:

dataset_copy (bool) – For Zarr stores only. If True, copy arrays as NumPy arrays.

Return type:

list[AnySParameter]

get_submissions()#

Retrieve all submissions, including pending.

Return type:

dict[int, JSONDocument]

get_submissions_by_ID(ids)#

Get submissions with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

dict[int, JSONDocument]

get_task(task_idx)#

Get a task.

Parameters:

task_idx (int) –

Return type:

AnySTask

get_task_elements(task_id, idx_lst=None)#

Get element data by an indices within a given task.

Element iterations and EARs belonging to the elements are included.

Parameters:
  • task_id (int) –

  • idx_lst (Iterable[int] | None) –

Return type:

Iterator[Mapping[str, Any]]

get_tasks()#

Retrieve all tasks, including pending.

Return type:

list[~AnySTask]

get_tasks_by_IDs(ids)#

Get tasks with the given IDs.

Parameters:

ids (Iterable[int]) –

Return type:

Sequence[AnySTask]

get_template()#

Get the workflow template.

Return type:

dict[str, JSONed]

get_template_components()#

Get all template components, including pending.

Return type:

dict[str, Any]

abstract get_ts_fmt()#

Get the timestamp format.

Return type:

str

abstract get_ts_name_fmt()#

Get the timestamp format for names.

Return type:

str

property has_pending: bool#

Whether there are any pending changes.

property is_submittable: bool#

Does this store support workflow submission?

property logger: Logger#

The logger to use.

property num_EARs_cache: int | None#

Cache for total number of persistent EARs.

property num_tasks_cache: int | None#

Cache for number of persistent tasks.

property param_sources_cache: dict[int, ParamSource]#

Cache for persistent parameter sources.

property parameter_cache: dict[int, AnySParameter]#

Cache for persistent parameters.

static prepare_test_store_from_spec(task_spec)#

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

Parameters:

task_spec (Sequence[Mapping[str, Sequence[Mapping[str, Sequence[Mapping[str, Sequence]]]]]]) –

Return type:

tuple[list[dict], list[dict], list[dict], list[dict]]

abstract rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
Parameters:
  • chunk_size (int | None) –

  • backup (bool) –

  • status (bool) –

Return type:

Any

abstract rechunk_runs(chunk_size=None, backup=True, status=True)#
Parameters:
  • chunk_size (int | None) –

  • backup (bool) –

  • status (bool) –

Return type:

Any

abstract reinstate_replaced_dir()#

Reinstate a replaced directory.

Return type:

None

remove_path(path)#

Try very hard to delete a directory or file.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

path (str | Path) –

Return type:

None

abstract remove_replaced_dir()#

Remove a replaced directory.

Return type:

None

rename_path(replaced, original)#

Revert the replaced workflow path to its original name.

This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.

Parameters:
Return type:

None

save()#

Commit pending changes to disk, if not in batch-update mode.

Return type:

None

set_EAR_end(EAR_ID, exit_code, success, save=True)#

Mark an element action run as finished.

Parameters:
  • EAR_ID (int) –

  • exit_code (int) –

  • success (bool) –

  • save (bool) –

Return type:

datetime

set_EAR_skip(EAR_ID, save=True)#

Mark an element action run as skipped.

Parameters:
Return type:

None

set_EAR_start(EAR_ID, save=True)#

Mark an element action run as started.

Parameters:
Return type:

datetime

set_EAR_submission_index(EAR_ID, sub_idx, save=True)#

Set the submission index for an element action run.

Parameters:
  • EAR_ID (int) –

  • sub_idx (int) –

  • save (bool) –

Return type:

None

set_EARs_initialised(iter_ID, save=True)#

Mark an element action run as initialised.

Parameters:
  • iter_ID (int) –

  • save (bool) –

Return type:

None

set_file(store_contents, is_input, param_id, path, contents=None, filename=None, clean_up=False, save=True)#

Set details of a file, including whether it is associated with a parameter.

Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • param_id (int | None) –

  • path (Path | str) –

  • contents (str | None) –

  • filename (str | None) –

  • clean_up (bool) –

  • save (bool) –

set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#

Set the metadata for a job script.

Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • version_info (VersionInfo | None) –

  • submit_time (str | None) –

  • submit_hostname (str | None) –

  • submit_machine (str | None) –

  • submit_cmdline (list[str] | None) –

  • os_name (str | None) –

  • shell_name (str | None) –

  • scheduler_name (str | None) –

  • scheduler_job_ID (str | None) –

  • process_ID (int | None) –

  • save (bool) –

set_parameter_value(param_id, value, is_file=False, save=True)#

Set the value of a parameter.

Parameters:
  • param_id (int) –

  • value (Any) –

  • is_file (bool) –

  • save (bool) –

property task_cache: dict[int, AnySTask]#

Cache for persistent tasks.

property ts_fmt: str#

The format for timestamps.

abstract unzip(path='.', log=None)#

Convert this store into expanded form.

Parameters:
  • path (str) –

  • log (str | None) –

Return type:

str

update_loop_num_iters(index, num_added_iters, save=True)#

Add iterations to a loop.

Parameters:
Return type:

None

update_loop_parents(index, num_added_iters, parents, save=True)#

Set the parents of a loop.

Parameters:
  • index (int) –

  • num_added_iters (Mapping[tuple[int, ...], int]) –

  • parents (Sequence[str]) –

  • save (bool) –

Return type:

None

update_param_source(param_sources, save=True)#

Set the source of a parameter.

Parameters:
Return type:

None

property use_cache: bool#

Whether to use a cache.

using_resource(res_label: Literal['metadata'], action: str) AbstractContextManager[Metadata]#
using_resource(res_label: Literal['submissions'], action: str) AbstractContextManager[list[JSONDocument]]
using_resource(res_label: Literal['parameters'], action: str) AbstractContextManager[dict[str, dict[str, Any]]]
using_resource(res_label: Literal['attrs'], action: str) AbstractContextManager[ZarrAttrsDict]

Context manager for managing StoreResource objects associated with the store.

property workflow: Workflow#

The workflow this relates to.

abstract classmethod write_empty_workflow(app, *, template_js, template_components_js, wk_path, fs, name, replaced_wk, creation_info, ts_fmt, ts_name_fmt)#

Write an empty workflow.

Parameters:
Return type:

None

abstract zip(path='.', log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#

Convert this store into archival form.

Parameters:
  • path (str) –

  • log (str | None) –

Return type:

str