hpcflow.sdk.persistence.json.JSONPersistentStore#

class hpcflow.sdk.persistence.json.JSONPersistentStore(app, workflow, path, fs)#

Bases: PersistentStore

Methods

add_EAR

Add a new EAR to an element iteration.

add_element

Add a new element to a task.

add_element_iteration

Add a new iteration to an element.

add_element_set

add_file

add_loop

Add a new loop to the workflow.

add_set_parameter

add_submission

Add a new submission.

add_submission_part

add_task

Add a new task to the workflow.

add_template_components

add_unset_parameter

cache_ctx

Context manager for using the persistent element/iteration/run cache.

cached_load

Context manager to cache the metadata.

check_parameters_exist

For each parameter ID, return True if it exists, else False

copy

Copy the workflow store.

delete

Delete the persistent workflow.

delete_no_confirm

Permanently delete the workflow data with no confirmation.

get_EAR_skipped

get_EARs

get_creation_info

get_element_iterations

get_elements

get_loops

Retrieve all loops, including pending.

get_loops_by_IDs

Retrieve loops by index (ID), including pending.

get_name

get_parameter_set_statuses

get_parameter_sources

get_parameters

param kwargs:

get_submissions

Retrieve all submissions, including pending.

get_submissions_by_ID

get_task

get_task_elements

Get element data by an indices within a given task.

get_tasks

Retrieve all tasks, including pending.

get_tasks_by_IDs

get_template

get_template_components

Get all template components, including pending.

get_ts_fmt

get_ts_name_fmt

make_test_store_from_spec

Generate an store for testing purposes.

prepare_test_store_from_spec

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

reinstate_replaced_dir

remove_path

Try very hard to delete a directory or file.

remove_replaced_dir

rename_path

Revert the replaced workflow path to its original name.

save

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end

set_EAR_skip

set_EAR_start

set_EAR_submission_index

set_EARs_initialised

set_file

set_jobscript_metadata

set_parameter_value

update_loop_num_iters

update_loop_parents

update_param_source

using_resource

Context manager for managing StoreResource objects associated with the store.

write_empty_workflow

Attributes

EAR_cache

Cache for persistent EARs.

element_cache

Cache for persistent elements.

element_iter_cache

Cache for persistent element iterations.

has_pending

is_submittable

Does this store support workflow submission?

logger

num_tasks_cache

Cache for number of persistent tasks.

param_sources_cache

Cache for persistent parameter sources.

parameter_cache

Cache for persistent parameters.

task_cache

Cache for persistent tasks.

ts_fmt

use_cache

property EAR_cache#

Cache for persistent EARs.

add_EAR(elem_iter_ID, action_idx, commands_idx, data_idx, metadata, save=True)#

Add a new EAR to an element iteration.

Parameters:
  • elem_iter_ID (int) –

  • action_idx (int) –

  • commands_idx (List[int]) –

  • data_idx (Dict) –

  • metadata (Dict) –

  • save (bool) –

Return type:

int

add_element(task_ID, es_idx, seq_idx, src_idx, save=True)#

Add a new element to a task.

Parameters:
  • task_ID (int) –

  • es_idx (int) –

  • seq_idx (Dict) –

  • src_idx (Dict) –

  • save (bool) –

add_element_iteration(element_ID, data_idx, schema_parameters, loop_idx=None, save=True)#

Add a new iteration to an element.

Parameters:
  • element_ID (int) –

  • data_idx (Dict) –

  • schema_parameters (List[str]) –

  • loop_idx (Dict | None) –

  • save (bool) –

Return type:

int

add_element_set(task_id, es_js, save=True)#
Parameters:
add_file(store_contents, is_input, source, path=None, contents=None, filename=None, save=True)#
Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • source (Dict) –

  • contents (str) –

  • filename (str) –

  • save (bool) –

add_loop(loop_template, iterable_parameters, parents, num_added_iterations, iter_IDs, save=True)#

Add a new loop to the workflow.

Parameters:
add_set_parameter(data, source, save=True)#
Parameters:
Return type:

int

add_submission(sub_idx, sub_js, save=True)#

Add a new submission.

Parameters:
add_submission_part(sub_idx, dt_str, submitted_js_idx, save=True)#
Parameters:
  • sub_idx (int) –

  • dt_str (str) –

  • submitted_js_idx (List[int]) –

  • save (bool) –

add_task(idx, task_template, save=True)#

Add a new task to the workflow.

Parameters:
  • idx (int) –

  • task_template (Dict) –

  • save (bool) –

add_template_components(temp_comps, save=True)#
Parameters:
Return type:

None

add_unset_parameter(source, save=True)#
Parameters:
Return type:

int

cache_ctx()#

Context manager for using the persistent element/iteration/run cache.

cached_load()#

Context manager to cache the metadata.

Return type:

Iterator[Dict]

check_parameters_exist(id_lst)#

For each parameter ID, return True if it exists, else False

Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

copy(path=None)#

Copy the workflow store.

This does not work on remote filesystems.

Return type:

str

delete()#

Delete the persistent workflow.

Return type:

None

delete_no_confirm()#

Permanently delete the workflow data with no confirmation.

Return type:

None

property element_cache#

Cache for persistent elements.

property element_iter_cache#

Cache for persistent element iterations.

get_EAR_skipped(EAR_ID)#
Parameters:

EAR_ID (int) –

Return type:

bool

get_EARs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySEAR]

get_creation_info()#
get_element_iterations(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElementIter]

get_elements(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySElement]

get_loops()#

Retrieve all loops, including pending.

Return type:

Dict[int, Dict]

get_loops_by_IDs(id_lst)#

Retrieve loops by index (ID), including pending.

Parameters:

id_lst (Iterable[int]) –

Return type:

Dict[int, Dict]

get_name()#
get_parameter_set_statuses(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[bool]

get_parameter_sources(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[Dict]

get_parameters(id_lst, **kwargs)#
Parameters:
  • kwargs (Dict) –

    dataset_copybool

    For Zarr stores only. If True, copy arrays as NumPy arrays.

  • id_lst (Iterable[int]) –

Return type:

List[AnySParameter]

get_submissions()#

Retrieve all submissions, including pending.

Return type:

Dict[int, Dict]

get_submissions_by_ID(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

Dict[int, Dict]

get_task(task_idx)#
Parameters:

task_idx (int) –

Return type:

AnySTask

get_task_elements(task_id, idx_lst=None)#

Get element data by an indices within a given task.

Element iterations and EARs belonging to the elements are included.

Parameters:

idx_lst (Iterable[int] | None) –

Return type:

List[Dict]

get_tasks()#

Retrieve all tasks, including pending.

Return type:

List[AnySTask]

get_tasks_by_IDs(id_lst)#
Parameters:

id_lst (Iterable[int]) –

Return type:

List[AnySTask]

get_template()#
Return type:

Dict

get_template_components()#

Get all template components, including pending.

Return type:

Dict

get_ts_fmt()#
get_ts_name_fmt()#
property has_pending#
property is_submittable#

Does this store support workflow submission?

property logger#
classmethod make_test_store_from_spec(app, spec, dir=None, path='test_store.json', overwrite=False)#

Generate an store for testing purposes.

property num_tasks_cache#

Cache for number of persistent tasks.

property param_sources_cache#

Cache for persistent parameter sources.

property parameter_cache#

Cache for persistent parameters.

static prepare_test_store_from_spec(task_spec)#

Generate a valid store from a specification in terms of nested elements/iterations/EARs.

reinstate_replaced_dir()#
Return type:

None

remove_path(path, fs)#

Try very hard to delete a directory or file.

Dropbox (on Windows, at least) seems to try to re-sync files if the parent directory is deleted soon after creation, which is the case on a failed workflow creation (e.g. missing inputs), so in addition to catching PermissionErrors generated when Dropbox has a lock on files, we repeatedly try deleting the directory tree.

Parameters:

path (str) –

Return type:

None

remove_replaced_dir()#
Return type:

None

rename_path(replaced, original, fs)#

Revert the replaced workflow path to its original name.

This happens when new workflow creation fails and there is an existing workflow with the same name; the original workflow which was renamed, must be reverted.

Parameters:
  • replaced (str) –

  • original (str) –

Return type:

None

save()#

Commit pending changes to disk, if not in batch-update mode.

set_EAR_end(EAR_ID, exit_code, success, save=True)#
Parameters:
  • EAR_ID (int) –

  • exit_code (int) –

  • success (bool) –

  • save (bool) –

Return type:

datetime

set_EAR_skip(EAR_ID, save=True)#
Parameters:
Return type:

None

set_EAR_start(EAR_ID, save=True)#
Parameters:
Return type:

datetime

set_EAR_submission_index(EAR_ID, sub_idx, save=True)#
Parameters:
  • EAR_ID (int) –

  • sub_idx (int) –

  • save (bool) –

Return type:

None

set_EARs_initialised(iter_ID, save=True)#
Parameters:
  • iter_ID (int) –

  • save (bool) –

Return type:

None

set_file(store_contents, is_input, param_id=None, path=None, contents=None, filename=None, clean_up=False, save=True)#
Parameters:
  • store_contents (bool) –

  • is_input (bool) –

  • param_id (int) –

  • contents (str) –

  • filename (str) –

  • clean_up (bool) –

  • save (bool) –

set_jobscript_metadata(sub_idx, js_idx, version_info=None, submit_time=None, submit_hostname=None, submit_machine=None, submit_cmdline=None, os_name=None, shell_name=None, scheduler_name=None, scheduler_job_ID=None, process_ID=None, save=True)#
Parameters:
  • sub_idx (int) –

  • js_idx (int) –

  • version_info (Dict | None) –

  • submit_time (str | None) –

  • submit_hostname (str | None) –

  • submit_machine (str | None) –

  • submit_cmdline (List[str] | None) –

  • os_name (str | None) –

  • shell_name (str | None) –

  • scheduler_name (str | None) –

  • scheduler_job_ID (str | None) –

  • process_ID (int | None) –

  • save (bool) –

set_parameter_value(param_id, value, is_file=False, save=True)#
Parameters:
  • param_id (int) –

  • value (Any) –

  • is_file (bool) –

  • save (bool) –

property task_cache#

Cache for persistent tasks.

property ts_fmt: str#
update_loop_num_iters(index, num_added_iters, save=True)#
Parameters:
  • index (int) –

  • num_added_iters (int) –

  • save (bool) –

Return type:

None

update_loop_parents(index, num_added_iters, parents, save=True)#
Parameters:
Return type:

None

update_param_source(param_id, source, save=True)#
Parameters:
Return type:

None

property use_cache#
using_resource(res_label, action)#

Context manager for managing StoreResource objects associated with the store.

classmethod write_empty_workflow(app, template_js, template_components_js, wk_path, fs, name, replaced_wk, creation_info, ts_fmt, ts_name_fmt)#
Parameters:
  • template_js (Dict) –

  • template_components_js (Dict) –

  • wk_path (str) –

  • name (str) –

  • replaced_wk (str) –

  • creation_info (Dict) –

  • ts_fmt (str) –

  • ts_name_fmt (str) –

Return type:

None