hpcflow.sdk.persistence.pending.PendingChanges#

class hpcflow.sdk.persistence.pending.PendingChanges(app, store, resource_map)#

Bases: Generic[AnySTask, AnySElement, AnySElementIter, AnySEAR, AnySParameter]

Class to store pending changes and merge them into a persistent store.

Parameters:
  • app (App) – The main application context.

  • store (PersistentStore) – The persistent store that owns this object

  • resource_map (CommitResourceMap) – Map of resources, used when processing commits.

Methods

commit_EAR_ends

Commit pending element action run finish information to disk.

commit_EAR_skips

Commit pending element action skip flags to disk.

commit_EAR_starts

Commit pending element action run start information to disk.

commit_EAR_submission_indices

Commit pending element action run submission index updates to disk.

commit_EARs

Commit pending element action runs to disk.

commit_EARs_initialised

Commit pending element action run init state updates to disk.

commit_all

Commit all pending changes to disk.

commit_at_submit_metadata

Commit to disk pending at-submit-time metadata, including submission parts.

commit_elem_IDs

Commit pending element ID updates to disk.

commit_elem_iter_EAR_IDs

Commit pending element action run ID updates to disk.

commit_elem_iter_IDs

Commit pending element iteration ID updates to disk.

commit_elem_iters

Commit pending element iterations to disk.

commit_element_sets

Commit pending element sets to disk.

commit_elements

Commit pending elements to disk.

commit_files

Add pending files to the files directory.

commit_iter_data_idx

commit_js_metadata

Commit pending jobscript metadata changes to disk.

commit_loop_indices

Make pending update to element iteration loop indices persistent.

commit_loop_num_iters

Make pending update to the number of loop iterations.

commit_loop_parents

Make pending update to additional loop parents.

commit_loops

Commit pending loops to disk.

commit_param_sources

Make pending changes to parameter sources persistent.

commit_parameters

Make pending parameters persistent.

commit_run_data_idx

commit_set_run_dirs

Commit pending run directory indices.

commit_submissions

Commit pending submissions to disk.

commit_tasks

Commit pending tasks to disk.

commit_template_components

Commit pending template components to disk.

commits_data

Decorator that wraps PendingChanges.commit_* methods with arguments listing which PendingChanges attributes must have non-trivial data in them for the method's invocation to be required.

get_pending_resource_map_groups

Retrive resource map groups, where values are filtered to include only those commit methods that must be invoked, due to pending data associated with those methods.

reset

Clear all pending data and prepare to accept new pending data.

where_pending

Get the list of items for which there is some outstanding pending items.

Attributes

logger

The logger.

add_tasks

Keys are new task IDs.

add_loops

Keys are loop IDs, values are loop descriptors.

add_submissions

Keys are submission IDs, values are submission descriptors.

add_elements

Keys are element IDs.

add_elem_iters

Keys are element iteration IDs.

add_EARs

Keys are element action run IDs.

add_parameters

Keys are parameter indices and values are tuples whose first element is data to add and whose second element is the source dict for the new data.

add_files

Workflow-related files (inputs, outputs) added to the persistent store.

add_template_components

Template components to add.

add_element_sets

Keys are element set IDs, values are descriptors.

add_elem_IDs

Keys are task IDs, and values are element IDs to add to that task.

add_elem_iter_IDs

Keys are element IDs, and values are iteration IDs to add to that element.

add_elem_iter_EAR_IDs

Keys are element iteration IDs, then EAR action index, and values are EAR IDs.

update_at_submit_metadata

Submission metadata added at submit-time, including submission parts.

set_EARs_initialised

IDs of EARs to mark as initialised.

set_EAR_submission_data

Submission IDs and commands file IDs to attach to EARs.

set_EAR_skips

IDs of EARs to mark as skipped.

set_EAR_starts

Keys are EAR IDs and values are tuples of start time, start dir snapshot, run hostname, and port number.

set_EAR_ends

Keys are EAR IDs and values are tuples of end time, end dir snapshot, exit code, and success boolean.

set_run_dirs

Each list item is a tuple of two arrays, the first of which is a run directory indices array, and the second of which is an integer array indicating with which run ID each run directory is associated.

set_js_metadata

Keys are IDs of jobscripts.

set_parameters

Keys are IDs of parameters to add or modify, and values are tuples of the parameter value, and whether the parameter is a file.

update_param_sources

Keys are parameter indices and values are dict parameter sources to merge with existing source of that parameter.

update_loop_indices

Keys are indices of loops, values are descriptions of what to update.

update_loop_num_iters

Keys are indices of loops, values are number of iterations.

update_loop_parents

Keys are indices of loops, values are list of parent names.

add_EARs: dict[int, AnySEAR]#

Keys are element action run IDs.

add_elem_IDs: dict[int, list[int]]#

Keys are task IDs, and values are element IDs to add to that task.

add_elem_iter_EAR_IDs: dict[int, dict[int, list[int]]]#

Keys are element iteration IDs, then EAR action index, and values are EAR IDs. This is a list of EAR IDs to add to a given element iteration action.

add_elem_iter_IDs: dict[int, list[int]]#

Keys are element IDs, and values are iteration IDs to add to that element.

add_elem_iters: dict[int, AnySElementIter]#

Keys are element iteration IDs.

add_element_sets: dict[int, list[Mapping]]#

Keys are element set IDs, values are descriptors.

add_elements: dict[int, AnySElement]#

Keys are element IDs.

add_files: list[FileDescriptor]#

Workflow-related files (inputs, outputs) added to the persistent store.

add_loops: dict[int, LoopDescriptor]#

Keys are loop IDs, values are loop descriptors.

add_parameters: dict[int, AnySParameter]#

Keys are parameter indices and values are tuples whose first element is data to add and whose second element is the source dict for the new data.

add_submissions: dict[int, Mapping[str, JSONed]]#

Keys are submission IDs, values are submission descriptors.

add_tasks: dict[int, AnySTask]#

Keys are new task IDs.

add_template_components: dict[str, dict[str, dict]]#

Template components to add.

commit_EAR_ends()#

Commit pending element action run finish information to disk.

Return type:

None

commit_EAR_skips()#

Commit pending element action skip flags to disk.

Return type:

None

commit_EAR_starts()#

Commit pending element action run start information to disk.

Return type:

None

commit_EAR_submission_indices()#

Commit pending element action run submission index updates to disk.

Return type:

None

commit_EARs()#

Commit pending element action runs to disk.

Return type:

None

commit_EARs_initialised()#

Commit pending element action run init state updates to disk.

Return type:

None

commit_all()#

Commit all pending changes to disk.

Return type:

None

commit_at_submit_metadata()#

Commit to disk pending at-submit-time metadata, including submission parts.

Return type:

None

commit_elem_IDs()#

Commit pending element ID updates to disk.

Return type:

None

commit_elem_iter_EAR_IDs()#

Commit pending element action run ID updates to disk.

Return type:

None

commit_elem_iter_IDs()#

Commit pending element iteration ID updates to disk.

Return type:

None

commit_elem_iters()#

Commit pending element iterations to disk.

Return type:

None

commit_element_sets()#

Commit pending element sets to disk.

Return type:

None

commit_elements()#

Commit pending elements to disk.

Return type:

None

commit_files()#

Add pending files to the files directory.

Return type:

None

commit_iter_data_idx()#
Return type:

None

commit_js_metadata()#

Commit pending jobscript metadata changes to disk.

Return type:

None

commit_loop_indices()#

Make pending update to element iteration loop indices persistent.

Return type:

None

commit_loop_num_iters()#

Make pending update to the number of loop iterations.

Return type:

None

commit_loop_parents()#

Make pending update to additional loop parents.

Return type:

None

commit_loops()#

Commit pending loops to disk.

Return type:

None

commit_param_sources()#

Make pending changes to parameter sources persistent.

Return type:

None

commit_parameters()#

Make pending parameters persistent.

Return type:

None

commit_run_data_idx()#
Return type:

None

commit_set_run_dirs()#

Commit pending run directory indices.

Return type:

None

commit_submissions()#

Commit pending submissions to disk.

Return type:

None

commit_tasks()#

Commit pending tasks to disk.

Return type:

None

commit_template_components()#

Commit pending template components to disk.

Return type:

None

commits_data()#

Decorator that wraps PendingChanges.commit_* methods with arguments listing which PendingChanges attributes must have non-trivial data in them for the method’s invocation to be required.

Notes

This essentially provides a mapping between PendingChanges attributes and commit_* methods. This allows us to only open the resources that need updating in PendingChanges.commit_all.

We use a decorator rather than an explicitly declaring the map in _commit_method_data_map to make the mapping obvious near the commit methods, and hopefully avoid us forgetting to update _commit_method_data_map when we modify or add commit methods in future!

Parameters:

data_list (str) –

get_pending_resource_map_groups()#

Retrive resource map groups, where values are filtered to include only those commit methods that must be invoked, due to pending data associated with those methods.

Notes

This method allows us to open only those resources that need to be updated, given the state of pending data.

Return type:

dict[tuple[str, …], list[str]]

property logger: Logger#

The logger.

reset(is_init=False)#

Clear all pending data and prepare to accept new pending data.

Parameters:

is_init (bool) –

Return type:

None

set_EAR_ends: dict[int, tuple[datetime, dict[str, Any] | None, int, bool]]#

Keys are EAR IDs and values are tuples of end time, end dir snapshot, exit code, and success boolean.

set_EAR_skips: dict[int, int]#

IDs of EARs to mark as skipped.

set_EAR_starts: dict[int, tuple[datetime, dict[str, Any] | None, str, int | None]]#

Keys are EAR IDs and values are tuples of start time, start dir snapshot, run hostname, and port number.

set_EAR_submission_data: dict[int, tuple[int, int | None]]#

Submission IDs and commands file IDs to attach to EARs.

set_EARs_initialised: list[int]#

IDs of EARs to mark as initialised.

set_js_metadata: dict[int, dict[int, dict[str, Any]]]#

Keys are IDs of jobscripts.

set_parameters: dict[int, tuple[Any, bool]]#

Keys are IDs of parameters to add or modify, and values are tuples of the parameter value, and whether the parameter is a file.

set_run_dirs: list[tuple[np.ndarray, np.ndarray]]#

Each list item is a tuple of two arrays, the first of which is a run directory indices array, and the second of which is an integer array indicating with which run ID each run directory is associated.

update_at_submit_metadata: dict[int, dict[str, Any]]#

Submission metadata added at submit-time, including submission parts.

update_loop_indices: dict[int, dict[str, int]]#

Keys are indices of loops, values are descriptions of what to update.

update_loop_num_iters: dict[int, list[list[list[int] | int]]]#

Keys are indices of loops, values are number of iterations.

update_loop_parents: dict[int, list[str]]#

Keys are indices of loops, values are list of parent names.

update_param_sources: dict[int, ParamSource]#

Keys are parameter indices and values are dict parameter sources to merge with existing source of that parameter.

where_pending()#

Get the list of items for which there is some outstanding pending items.

Return type:

list[str]