hpcflow.sdk.core package#

Submodules#

hpcflow.sdk.core.actions module#

class hpcflow.sdk.core.actions.Action(environments, commands=None, script=None, input_file_generators=None, output_file_parsers=None, input_files=None, output_files=None, rules=None)#

Bases: JSONLike

Parameters:
expand()#
generate_data_index(act_idx, EAR_idx, schema_data_idx, all_data_idx, workflow, param_source)#

Generate the data index for this action of an element iteration whose overall data index is passed.

get_command_input_types()#

Get parameter types from commands.

Return type:

Tuple[str]

get_command_output_types()#

Get parameter types from command stdout and stderr arguments.

Return type:

Tuple[str]

get_commands_action_env()#
get_input_file_generator_action_env(input_file_generator)#
Parameters:

input_file_generator (InputFileGenerator) –

get_input_file_labels()#
get_input_types()#

Get the input types that are consumed by commands and input file generators of this action.

Return type:

Tuple[str]

get_output_file_labels()#
get_output_file_parser_action_env(output_file_parser)#
Parameters:

output_file_parser (OutputFileParser) –

get_output_types()#

Get the output types that are produced by command standard outputs and errors, and by output file parsers of this action.

Return type:

Tuple[str]

get_parameter_dependence(parameter)#

Find if/where a given parameter is used by the action.

Parameters:

parameter (SchemaParameter) –

get_possible_scopes()#

Get the action scopes that are inclusive of this action, ordered by decreasing specificity.

Return type:

Tuple[ActionScope]

get_precise_scope()#
Return type:

ActionScope

get_resolved_action_env(relevant_scopes, input_file_generator=None, output_file_parser=None, commands=None)#
Parameters:
get_script_path(script_name)#

Return the script path, relative to the EAR directory.

is_input_type_required(typ, provided_files)#
Parameters:
  • typ (str) –

  • provided_files (List[FileSpec]) –

Return type:

bool

property task_schema#
class hpcflow.sdk.core.actions.ActionEnvironment(environment: 'Environment', scope: 'Optional[ActionScope]' = None)#

Bases: JSONLike

Parameters:
environment: Environment#
scope: ActionScope | None = None#
class hpcflow.sdk.core.actions.ActionRule(check_exists=None, check_missing=None, rule=None)#

Bases: JSONLike

Class to represent a rule/condition that must be True if an action is to be included.

Parameters:
  • check_exists (str | None) –

  • check_missing (str | None) –

  • rule (Rule | None) –

check_exists: str | None = None#
check_missing: str | None = None#
rule: Rule | None = None#
class hpcflow.sdk.core.actions.ActionScope(typ, **kwargs)#

Bases: JSONLike

Class to represent the identification of a subset of task schema actions by a filtering process.

Parameters:

typ (Union[ActionScopeType, str]) –

classmethod any()#
classmethod from_json_like(json_like, shared_data=None)#
Parameters:
  • json_like

  • shared_data (dict of (str: ObjectList)) –

classmethod input_file_generator(file=None)#
classmethod main()#
classmethod output_file_parser(output=None)#
classmethod processing()#
to_string()#
class hpcflow.sdk.core.actions.ActionScopeType(value)#

Bases: Enum

An enumeration.

ANY = 0#
INPUT_FILE_GENERATOR = 3#
MAIN = 1#
OUTPUT_FILE_PARSER = 4#
PROCESSING = 2#
class hpcflow.sdk.core.actions.EARSubmissionStatus(value)#

Bases: Enum

An enumeration.

COMPLETE = 4#
PENDING = 0#
PREPARED = 1#
RUNNING = 3#
SUBMITTED = 2#
class hpcflow.sdk.core.actions.EAR_ID(task_insert_ID: 'int', element_idx: 'int', iteration_idx: 'int', action_idx: 'int', run_idx: 'int', EAR_idx: 'int')#

Bases: IterationID

Parameters:
  • task_insert_ID (int) –

  • element_idx (int) –

  • iteration_idx (int) –

  • action_idx (int) –

  • run_idx (int) –

  • EAR_idx (int) –

EAR_idx: int#
action_idx: int#
get_iteration_ID()#
run_idx: int#
class hpcflow.sdk.core.actions.ElementAction(element_iteration, action_idx, runs)#

Bases: object

property action#
property action_idx#
property element#
property element_iteration#
get(path=None, run_idx=-1, default=None, raise_on_missing=False)#
Parameters:
  • path (str | None) –

  • run_idx (int) –

  • default (Any | None) –

  • raise_on_missing (bool) –

get_data_idx(path=None, run_idx=-1)#
Parameters:
  • path (str | None) –

  • run_idx (int) –

get_parameter_names(prefix)#
get_parameter_sources(path=None, run_idx=-1, typ=None, as_strings=False, use_task_index=False)#
Parameters:
  • path (str | None) –

  • run_idx (int) –

  • typ (str | None) –

  • as_strings (bool) –

  • use_task_index (bool) –

property input_files#
property inputs#
property num_runs#
property output_files#
property outputs#
property runs#
property task#
class hpcflow.sdk.core.actions.ElementActionOLD(element: 'Element', root_action: 'Action', commands: 'List[Command]', input_file_generator: 'Optional[InputFileGenerator]' = None, output_parser: 'Optional[OutputFileParser]' = None)#

Bases: object

Parameters:
commands: List[Command]#
element: Element#
execute()#
get_environment()#
input_file_generator: InputFileGenerator | None = None#
output_parser: OutputFileParser | None = None#
root_action: Action#
class hpcflow.sdk.core.actions.ElementActionRun(element_action, run_idx, index, data_idx, metadata)#

Bases: object

Parameters:
  • run_idx (int) –

  • index (int) –

  • data_idx (Dict) –

  • metadata (Dict) –

property EAR_ID#

EAR index object.

property action#
compose_commands(jobscript)#
Returns:

  • commands

  • shell_vars – List of shell variable names that must be saved as workflow parameter data as strings.

Parameters:

jobscript (Jobscript) –

Return type:

Tuple[str, List[str]]

compose_source()#

Generate the file contents of this source.

Return type:

str

property data_idx#
property element#
property element_action#
property element_iteration#
property end_time#
get(path=None, default=None, raise_on_missing=False)#
Parameters:
  • path (str | None) –

  • default (Any | None) –

  • raise_on_missing (bool) –

get_EAR_dependencies(as_objects=False)#

Get EARs that this EAR depends on.

get_IFG_input_values()#
Return type:

Dict[str, Any]

get_OFP_output_files()#
Return type:

Dict[str, str | List[str]]

get_data_idx(path=None)#
Parameters:

path (str | None) –

get_environment()#
get_input_dependencies()#

Get information about locally defined input, sequence, and schema-default values that this EAR depends on. Note this does not get values from this EAR’s task/schema, because the aim of this method is to help determine which upstream tasks this EAR depends on.

get_input_values()#
Return type:

Dict[str, Any]

get_parameter_names(prefix)#
get_parameter_sources(path=None, typ=None, as_strings=False, use_task_index=False)#
Parameters:
  • path (str | None) –

  • typ (str | None) –

  • as_strings (bool) –

  • use_task_index (bool) –

get_resources()#

Resolve specific resources for this EAR, considering all applicable scopes and template-level resources.

get_template_resources()#

Get template-level resources.

property index#

Task-wide EAR index.

property input_files#
property inputs#
property metadata#
property output_files#
property outputs#
property resources#
property run_idx#
property start_time#
property submission_idx#
property submission_status#
property success#
property task#
property workflow#
write_source()#
class hpcflow.sdk.core.actions.ElementID(task_insert_ID: 'int', element_idx: 'int')#

Bases: object

Parameters:
  • task_insert_ID (int) –

  • element_idx (int) –

element_idx: int#
task_insert_ID: int#
class hpcflow.sdk.core.actions.IterationID(task_insert_ID, element_idx, iteration_idx)#

Bases: ElementID

Parameters:
  • iteration_idx (int) – Index into the element_iterations list/array of the task. Note this is NOT the index into local list of ElementIterations belong to an Element.

  • task_insert_ID (int) –

  • element_idx (int) –

get_element_ID()#
iteration_idx: int#

hpcflow.sdk.core.command_files module#

class hpcflow.sdk.core.command_files.FileNameExt(file_name: 'FileNameSpec')#

Bases: JSONLike

Parameters:

file_name (FileNameSpec) –

file_name: FileNameSpec#
value(directory=None)#
class hpcflow.sdk.core.command_files.FileNameSpec(name, args=None, is_regex=False)#

Bases: JSONLike

property ext#
property stem#
value(directory=None)#
class hpcflow.sdk.core.command_files.FileNameStem(file_name: 'FileNameSpec')#

Bases: JSONLike

Parameters:

file_name (FileNameSpec) –

file_name: FileNameSpec#
value(directory=None)#
class hpcflow.sdk.core.command_files.FileSpec(label: 'str', name: 'str', _hash_value: 'Optional[str]' = None)#

Bases: JSONLike

Parameters:
  • label (str) –

  • name (str) –

  • _hash_value (str | None) –

property ext#
label: str#
name: str#
property stem#
value(directory=None)#
class hpcflow.sdk.core.command_files.InputFile(file, path=None, contents=None, extension='', store_contents=True)#

Bases: _FileContentsSpecifier

Parameters:
  • file (Union[FileSpec, str]) –

  • path (Optional[Union[Path, str]]) –

  • contents (Optional[str]) –

  • extension (Optional[str]) –

  • store_contents (Optional[bool]) –

property normalised_files_path#
property normalised_path#
to_dict()#
class hpcflow.sdk.core.command_files.InputFileGenerator(input_file: 'FileSpec', inputs: 'List[Parameter]', script: 'str' = None, environment: 'Environment' = None)#

Bases: JSONLike

Parameters:
compose_source()#

Generate the file contents of this input file generator source.

Return type:

str

environment: Environment = None#
get_action_rule()#

Get the rule that allows testing if this input file generator must be run or not for a given element.

input_file: FileSpec#
inputs: List[Parameter]#
script: str = None#
write_source(action)#
class hpcflow.sdk.core.command_files.InputFileGeneratorSource(generator, path=None, contents=None, extension='')#

Bases: _FileContentsSpecifier

Parameters:
  • generator (InputFileGenerator) –

  • path (Union[Path, str]) –

  • contents (str) –

  • extension (str) –

class hpcflow.sdk.core.command_files.OutputFileParser(output: 'Parameter', output_files: 'List[FileSpec]', script: 'str' = None, environment: 'Environment' = None, inputs: 'List[str]' = None, options: 'Dict' = None)#

Bases: JSONLike

Parameters:
  • output (Parameter) –

  • output_files (List[FileSpec]) –

  • script (str) –

  • environment (Environment) –

  • inputs (List[str]) –

  • options (Dict) –

compose_source()#

Generate the file contents of this output file parser source.

Return type:

str

environment: Environment = None#
inputs: List[str] = None#
options: Dict = None#
output: Parameter#
output_files: List[FileSpec]#
script: str = None#
write_source(action)#
class hpcflow.sdk.core.command_files.OutputFileParserSource(parser, path=None, contents=None, extension='')#

Bases: _FileContentsSpecifier

Parameters:
  • parser (OutputFileParser) –

  • path (Union[Path, str]) –

  • contents (str) –

  • extension (str) –

hpcflow.sdk.core.commands module#

class hpcflow.sdk.core.commands.Command(command: str, arguments: List[Any] = None, stdout: str = None, stderr: str = None, stdin: str = None)#

Bases: JSONLike

Parameters:
  • command (str) –

  • arguments (List[Any]) –

  • stdout (str) –

  • stderr (str) –

  • stdin (str) –

arguments: List[Any] = None#
command: str#
get_output_types()#
stderr: str = None#
stdin: str = None#
stdout: str = None#
class hpcflow.sdk.core.commands.CommandArgument(parts)#

Bases: object

Parameters:

parts (list of any of str, File, Parameter) –

parts: List[Any]#

hpcflow.sdk.core.element module#

class hpcflow.sdk.core.element.Element(task, index, es_idx, seq_idx, src_idx, iterations)#

Bases: object

Parameters:
  • task (WorkflowTask) –

  • index (int) –

  • es_idx (int) –

  • seq_idx (Dict[str, int]) –

  • src_idx (Dict[str, int]) –

property action_runs: List[ElementActionRun]#

Get a list of element action runs from the latest iteration, where only the final run is taken for each element action.

property actions: Dict[ElementAction]#
property dir_name#
property element_ID#
property element_set#
property element_set_idx: int#
get(path=None, action_idx=None, run_idx=-1, default=None, raise_on_missing=False)#

Get element data of the most recent iteration from the persistent store.

Parameters:
  • path (str | None) –

  • action_idx (int | None) –

  • run_idx (int) –

  • default (Any | None) –

  • raise_on_missing (bool) –

Return type:

Any

get_EAR_dependencies(as_objects=False)#

Get EARs that the most recent iteration of this element depends on.

Parameters:

as_objects (bool) –

Return type:

List[Union[EAR_idx_type, ElementActionRun]]

get_data_idx(path=None, action_idx=None, run_idx=-1)#

Get the data index of the most recent element iteration.

Parameters:
  • action_idx (int | None) – The index of the action within the schema.

  • path (str | None) –

  • run_idx (int) –

Return type:

Dict[str, int]

get_dependent_EARs(as_objects=False)#

Get EARs that depend on the most recent iteration of this element.

Parameters:

as_objects (bool) –

Return type:

List[Union[EAR_idx_type, ElementActionRun]]

get_dependent_element_iterations(as_objects=False)#

Get element iterations that depend on the most recent iteration of this element.

Parameters:

as_objects (bool) –

Return type:

List[Tuple[int, int, int] | ElementIteration]

get_dependent_elements(as_objects=False)#

Get elements that depend on the most recent iteration of this element.

Parameters:

as_objects (bool) –

Return type:

List[Tuple[int, int] | Element]

get_dependent_elements_recursively(task_insert_ID=None)#

Get downstream elements that depend on this element, including recursive dependencies.

Dependencies are resolved using the initial iteration only. This method is used to identify from which element in the previous iteration and new iteration should be parametrised.

Parameters:

task_insert_ID – If specified, only return elements from this task.

get_dependent_tasks(as_objects=False)#

Get tasks that depend on the most recent iteration of this element.

Parameters:

as_objects (bool) –

Return type:

List[Union[int, WorkflowTask]]

get_element_dependencies(as_objects=False)#

Get elements that the most recent iteration of this element depends on.

Parameters:

as_objects (bool) –

Return type:

List[Tuple[int, int] | Element]

get_element_iteration_dependencies(as_objects=False)#

Get element iterations that the most recent iteration of this element depends on.

Parameters:

as_objects (bool) –

Return type:

List[Tuple[int, int, int] | ElementIteration]

get_input_dependencies()#

Get locally defined inputs/sequences/defaults from other tasks that this the most recent iteration of this element depends on.

Return type:

Dict[str, Dict]

get_parameter_sources(path=None, action_idx=None, run_idx=-1, typ=None, as_strings=False, use_task_index=False)#

“Get the parameter sources of the most recent element iteration.

Parameters:
  • use_task_index (bool) – If True, use the task index within the workflow, rather than the task insert ID.

  • path (str | None) –

  • action_idx (int | None) –

  • run_idx (int) –

  • typ (str | None) –

  • as_strings (bool) –

Return type:

Dict[str, str | Dict[str, Any]]

get_sequence_value(sequence_path)#
Parameters:

sequence_path (str) –

Return type:

Any

get_task_dependencies(as_objects=False)#

Get tasks (insert ID or WorkflowTask objects) that the most recent iteration of this element depends on.

Dependencies may come from either elements from upstream tasks, or from locally defined inputs/sequences/defaults from upstream tasks.

Parameters:

as_objects (bool) –

Return type:

List[Union[int, WorkflowTask]]

property index: int#

Get the index of the element within the task.

Note: the global_idx attribute returns the index of the element within the workflow, across all tasks.

init_loop_index(loop_name)#
Parameters:

loop_name (str) –

property input_files: ElementInputFiles#
property input_source_idx: Dict[str, int]#
property input_sources: Dict[str, InputSource]#
property inputs: ElementInputs#
property iterations: Dict[ElementAction]#
property latest_iteration#
property output_files: ElementOutputFiles#
property outputs: ElementOutputs#
property schema_parameters: List[str]#
property sequence_idx: Dict[str, int]#
property task: WorkflowTask#
to_element_set_data()#

Generate lists of workflow-bound InputValues and ResourceList.

property workflow: Workflow#
class hpcflow.sdk.core.element.ElementFilter(parameter_path: 'ParameterPath', condition: 'ConditionLike')#

Bases: object

Parameters:
  • parameter_path (ParameterPath) –

  • condition (ConditionLike) –

condition: ConditionLike#
parameter_path: ParameterPath#
class hpcflow.sdk.core.element.ElementGroup(name: 'str', where: 'Optional[ElementFilter]' = None, group_by_distinct: 'Optional[ParameterPath]' = None)#

Bases: object

Parameters:
group_by_distinct: ParameterPath | None = None#
name: str#
where: ElementFilter | None = None#
class hpcflow.sdk.core.element.ElementInputFiles(element_iteration=None, element_action=None, element_action_run=None)#

Bases: _ElementPrefixedParameter

Parameters:
class hpcflow.sdk.core.element.ElementInputs(element_iteration=None, element_action=None, element_action_run=None)#

Bases: _ElementPrefixedParameter

Parameters:
class hpcflow.sdk.core.element.ElementIteration(index, element, data_idx, EARs_initialised, actions, global_idx, schema_parameters, loop_idx)#

Bases: object

Parameters:
  • index (int) –

  • element (Element) –

  • data_idx (Dict) –

  • EARs_initialised (bool) –

  • actions (List[Dict]) –

  • global_idx (int) –

  • schema_parameters (List[str]) –

  • loop_idx (Dict) –

property EARs_initialised#

Whether or not the EARs have been initialised.

property action_runs: List[ElementActionRun]#

Get a list of element action runs, where only the final run is taken for each element action.

property actions: Dict[ElementAction]#
property data_idx#

The overall element iteration data index, before resolution of EARs.

property element#
get(path=None, action_idx=None, run_idx=-1, default=None, raise_on_missing=False)#

Get element data from the persistent store.

Parameters:
  • path (str | None) –

  • action_idx (int | None) –

  • run_idx (int) –

  • default (Any | None) –

  • raise_on_missing (bool) –

Return type:

Any

get_EAR_dependencies(as_objects=False)#

Get EARs that this element iteration depends on (excluding EARs of this element iteration).

Parameters:

as_objects (Optional[bool]) –

Return type:

List[Union[EAR_ID, ElementActionRun]]

get_data_idx(path=None, action_idx=None, run_idx=-1)#
Parameters:
  • action_idx (int | None) – The index of the action within the schema.

  • path (str | None) –

  • run_idx (int) –

Return type:

Dict[str, int]

get_dependent_EARs(as_objects=False)#

Get EARs of downstream iterations and tasks that depend on this element iteration.

Parameters:

as_objects (bool) –

Return type:

List[Union[EAR_ID, ElementActionRun]]

get_dependent_element_iterations(as_objects=False)#

Get elements iterations of downstream iterations and tasks that depend on this element iteration.

Parameters:

as_objects (bool) –

Return type:

List[IterationID | ElementIteration]

get_dependent_elements(as_objects=False)#

Get elements of downstream tasks that depend on this element iteration.

Parameters:

as_objects (bool) –

Return type:

List[ElementID | Element]

get_dependent_tasks(as_objects=False)#

Get downstream tasks that depend on this element iteration.

Parameters:

as_objects (bool) –

Return type:

List[Union[int, WorkflowTask]]

get_element_dependencies(as_objects=False)#

Get elements that this element iteration depends on.

Parameters:

as_objects (bool | None) –

Return type:

List[ElementID | Element]

get_element_iteration_dependencies(as_objects=False)#

Get element iterations that this element iteration depends on.

Parameters:

as_objects (bool) –

Return type:

List[IterationID | ElementIteration]

get_input_dependencies()#

Get locally defined inputs/sequences/defaults from other tasks that this element iteration depends on.

Return type:

Dict[str, Dict]

get_parameter_names(prefix)#
Parameters:

prefix (str) –

Return type:

List[str]

get_parameter_sources(path=None, action_idx=None, run_idx=-1, typ=None, as_strings=False, use_task_index=False)#
Parameters:
  • use_task_index (bool) – If True, use the task index within the workflow, rather than the task insert ID.

  • path (str | None) –

  • action_idx (int | None) –

  • run_idx (int) –

  • typ (str | None) –

  • as_strings (bool) –

Return type:

Dict[str, str | Dict[str, Any]]

get_task_dependencies(as_objects=False)#

Get tasks (insert ID or WorkflowTask objects) that this element iteration depends on.

Dependencies may come from either elements from upstream tasks, or from locally defined inputs/sequences/defaults from upstream tasks.

Parameters:

as_objects (bool) –

Return type:

List[Union[int, WorkflowTask]]

property global_idx: int#
property index#
property input_files: ElementInputFiles#
property inputs: ElementInputs#
property iteration_ID#
property loop_idx: Dict[str, int]#
property output_files: ElementOutputFiles#
property outputs: ElementOutputs#
property schema_parameters: List[str]#
property task#
property workflow#
class hpcflow.sdk.core.element.ElementOutputFiles(element_iteration=None, element_action=None, element_action_run=None)#

Bases: _ElementPrefixedParameter

Parameters:
class hpcflow.sdk.core.element.ElementOutputs(element_iteration=None, element_action=None, element_action_run=None)#

Bases: _ElementPrefixedParameter

Parameters:
class hpcflow.sdk.core.element.ElementParameter(task: 'WorkflowTask', path: 'str', parent: 'Union[Element, ElementAction, ElementActionRun, Parameters]', element: 'Element', data_idx: 'Dict[str, int]')#

Bases: object

Parameters:
data_idx: Dict[str, int]#
property data_idx_is_set#
element: Element#
get_size(**store_kwargs)#
property is_set#
parent: Element | ElementAction | ElementActionRun | Parameters#
path: str#
task: WorkflowTask#
property value#
class hpcflow.sdk.core.element.ElementRepeats(number: 'int', where: 'Optional[ElementFilter]' = None)#

Bases: object

Parameters:
number: int#
where: ElementFilter | None = None#
class hpcflow.sdk.core.element.ElementResources(scratch: 'str' = None, num_cores: 'int' = None, scheduler: 'str' = None, shell: 'str' = None, use_job_array: 'bool' = None, time_limit: 'str' = None, scheduler_options: 'Dict' = None, scheduler_args: 'Dict' = None, shell_args: 'Dict' = None, os_name: 'str' = None)#

Bases: JSONLike

Parameters:
  • scratch (str) –

  • num_cores (int) –

  • scheduler (str) –

  • shell (str) –

  • use_job_array (bool) –

  • time_limit (str) –

  • scheduler_options (Dict) –

  • scheduler_args (Dict) –

  • shell_args (Dict) –

  • os_name (str) –

get_jobscript_hash()#

Get hash from all arguments that distinguish jobscripts.

num_cores: int = None#
os_name: str = None#
scheduler: str = None#
scheduler_args: Dict = None#
scheduler_options: Dict = None#
scratch: str = None#
shell: str = None#
shell_args: Dict = None#
time_limit: str = None#
use_job_array: bool = None#

hpcflow.sdk.core.environment module#

class hpcflow.sdk.core.environment.Environment(name, setup=None, specifiers=None, executables=None, _hash_value=None)#

Bases: JSONLike

class hpcflow.sdk.core.environment.Executable(label, instances)#

Bases: JSONLike

Parameters:
property environment#
filter_instances(parallel_mode=None, num_cores=None)#
class hpcflow.sdk.core.environment.ExecutableInstance(parallel_mode: str, num_cores: Any, command: str)#

Bases: JSONLike

Parameters:
  • parallel_mode (str) –

  • num_cores (Any) –

  • command (str) –

command: str#
classmethod from_spec(spec)#
num_cores: Any#
parallel_mode: str#
class hpcflow.sdk.core.environment.NumCores(start: int, stop: int, step: int = None)#

Bases: JSONLike

Parameters:
  • start (int) –

  • stop (int) –

  • step (int) –

start: int#
step: int = None#
stop: int#

hpcflow.sdk.core.errors module#

exception hpcflow.sdk.core.errors.DuplicateExecutableError#

Bases: ValueError

exception hpcflow.sdk.core.errors.EnvironmentSpecValidationError#

Bases: Exception

exception hpcflow.sdk.core.errors.ExtraInputs(message, extra_inputs)#

Bases: Exception

Return type:

None

exception hpcflow.sdk.core.errors.FileSpecValidationError#

Bases: Exception

exception hpcflow.sdk.core.errors.FromSpecMissingObjectError#

Bases: Exception

exception hpcflow.sdk.core.errors.InputSourceValidationError#

Bases: Exception

exception hpcflow.sdk.core.errors.InputValueDuplicateSequenceAddress#

Bases: ValueError

exception hpcflow.sdk.core.errors.InvalidIdentifier#

Bases: ValueError

exception hpcflow.sdk.core.errors.InvalidInputSourceTaskReference#

Bases: Exception

exception hpcflow.sdk.core.errors.JobscriptSubmissionFailure(message, submit_cmd, js_idx, js_path, stdout, stderr, subprocess_exc, job_ID_parse_exc)#

Bases: RuntimeError

Return type:

None

exception hpcflow.sdk.core.errors.LoopAlreadyExistsError#

Bases: Exception

exception hpcflow.sdk.core.errors.MalformedParameterPathError#

Bases: ValueError

exception hpcflow.sdk.core.errors.MissingActionEnvironment#

Bases: Exception

exception hpcflow.sdk.core.errors.MissingCompatibleActionEnvironment#

Bases: Exception

exception hpcflow.sdk.core.errors.MissingInputs(message, missing_inputs)#

Bases: Exception

Return type:

None

exception hpcflow.sdk.core.errors.ParameterSpecValidationError#

Bases: Exception

exception hpcflow.sdk.core.errors.SchedulerVersionsFailure(message)#

Bases: RuntimeError

We couldn’t get the scheduler and or shell versions.

exception hpcflow.sdk.core.errors.SubmissionFailure(message)#

Bases: RuntimeError

Return type:

None

exception hpcflow.sdk.core.errors.TaskSchemaMissingParameterError#

Bases: Exception

exception hpcflow.sdk.core.errors.TaskSchemaSpecValidationError#

Bases: Exception

exception hpcflow.sdk.core.errors.TaskTemplateInvalidNesting#

Bases: ValueError

exception hpcflow.sdk.core.errors.TaskTemplateMultipleInputValues#

Bases: ValueError

exception hpcflow.sdk.core.errors.TaskTemplateMultipleSchemaObjectives#

Bases: ValueError

exception hpcflow.sdk.core.errors.TaskTemplateUnexpectedInput#

Bases: ValueError

exception hpcflow.sdk.core.errors.TaskTemplateUnexpectedSequenceInput#

Bases: ValueError

exception hpcflow.sdk.core.errors.ToJSONLikeChildReferenceError#

Bases: Exception

exception hpcflow.sdk.core.errors.UnknownResourceSpecItemError#

Bases: ValueError

exception hpcflow.sdk.core.errors.UnrequiredInputSources(message, unrequired_sources)#

Bases: ValueError

Return type:

None

exception hpcflow.sdk.core.errors.UnsetParameterDataError#

Bases: Exception

exception hpcflow.sdk.core.errors.UnsupportedShellError#

Bases: ValueError

We don’t support this shell on this OS.

exception hpcflow.sdk.core.errors.ValuesAlreadyPersistentError#

Bases: Exception

exception hpcflow.sdk.core.errors.WorkflowBatchUpdateFailedError#

Bases: Exception

exception hpcflow.sdk.core.errors.WorkflowLimitsError#

Bases: ValueError

exception hpcflow.sdk.core.errors.WorkflowNotFoundError#

Bases: Exception

exception hpcflow.sdk.core.errors.WorkflowParameterMissingError#

Bases: AttributeError

exception hpcflow.sdk.core.errors.WorkflowSpecValidationError#

Bases: Exception

exception hpcflow.sdk.core.errors.WorkflowSubmissionFailure#

Bases: RuntimeError

hpcflow.sdk.core.json_like module#

class hpcflow.sdk.core.json_like.BaseJSONLike#

Bases: object

_class_namespace#

Namespace whose attributes include the class definitions that might be referenced (and so require instantiation) in child objects.

Type:

namespace

_shared_data_namespace#

Namespace whose attributes include the shared data that might be referenced in child objects.

Type:

namespace

classmethod from_json_like(json_like, shared_data=None)#
Parameters:
  • json_like

  • shared_data (dict of (str: ObjectList)) –

to_dict()#
to_json_like(dct=None, shared_data=None, exclude=None, path=None)#
class hpcflow.sdk.core.json_like.ChildObjectSpec(name: str, class_name: str | None = None, class_obj: Type | None = None, json_like_name: str | None = None, is_multiple: bool | None = False, dict_key_attr: str | None = None, dict_val_attr: str | None = None, parent_ref: str | None = None, is_single_attribute: bool | None = False, is_enum: bool | None = False, is_dict_values: bool | None = False, is_dict_values_ensure_list: bool | None = False, shared_data_name: str | None = None, shared_data_primary_key: str | None = None)#

Bases: object

Parameters:
  • name (str) –

  • class_name (str | None) –

  • class_obj (Type | None) –

  • json_like_name (str | None) –

  • is_multiple (bool | None) –

  • dict_key_attr (str | None) –

  • dict_val_attr (str | None) –

  • parent_ref (str | None) –

  • is_single_attribute (bool | None) –

  • is_enum (bool | None) –

  • is_dict_values (bool | None) –

  • is_dict_values_ensure_list (bool | None) –

  • shared_data_name (str | None) –

  • shared_data_primary_key (str | None) –

class_name: str | None = None#
class_obj: Type | None = None#
dict_key_attr: str | None = None#
dict_val_attr: str | None = None#
is_dict_values: bool | None = False#
is_dict_values_ensure_list: bool | None = False#
is_enum: bool | None = False#
is_multiple: bool | None = False#
is_single_attribute: bool | None = False#
json_like_name: str | None = None#
name: str#
parent_ref: str | None = None#
shared_data_name: str | None = None#
shared_data_primary_key: str | None = None#
class hpcflow.sdk.core.json_like.JSONLike#

Bases: BaseJSONLike

BaseJSONLike, where the class namespace is the App instance.

to_dict()#
hpcflow.sdk.core.json_like.to_json_like(obj, shared_data=None, parent_refs=None, path=None)#

hpcflow.sdk.core.loop module#

class hpcflow.sdk.core.loop.Loop(tasks, num_iterations, name=None, non_iterable_parameters=None)#

Bases: JSONLike

Parameters:
  • tasks (List[Union[int, WorkflowTask]]) –

  • num_iterations (int) –

  • name (Optional[str]) –

  • non_iterable_parameters (Optional[List[str]]) –

property name#
property non_iterable_parameters#
property num_iterations#
property task_insert_IDs: Tuple[int]#

Get the list of task insert_IDs that define the extent of the loop.

property task_objects: Tuple[WorkflowTask]#
to_dict()#
property workflow_template#
class hpcflow.sdk.core.loop.WorkflowLoop(index, workflow, template, num_added_iterations, iterable_parameters)#

Bases: object

Class to represent a Loop that is bound to a Workflow.

Parameters:
  • index (int) –

  • workflow (Workflow) –

  • template (Loop) –

  • num_added_iterations (int) –

  • iterable_parameters (Dict[int:List[int, List[int]]]) –

add_iteration(parent_loop_indices=None)#
get_child_loops()#

Get loops whose task subset is a subset of this loop’s task subset. If two loops have identical task subsets, the first loop in the workflow loop index is considered the parent.

Return type:

List[WorkflowLoop]

get_parent_loops()#

Get loops whose task subset is a superset of this loop’s task subset. If two loops have identical task subsets, the first loop in the workflow loop index is considered the parent.

Return type:

List[WorkflowLoop]

property index#
property iterable_parameters#
property name#
classmethod new_empty_loop(index, workflow, template)#
Parameters:
  • index (int) –

  • workflow (Workflow) –

  • template (Loop) –

property num_added_iterations#
property num_iterations#
property task_indices: Tuple[int]#

Get the list of task indices that define the extent of the loop.

property task_insert_IDs#
property task_objects#
property template#
property workflow#

hpcflow.sdk.core.object_list module#

class hpcflow.sdk.core.object_list.AppDataList(_objects, access_attribute, descriptor=None)#

Bases: DotAccessObjectList

classmethod from_json_like(json_like, shared_data=None, is_hashed=False)#
Parameters:

is_hashed (bool, optional) – If True, accept a dict whose keys are hashes of the dict values.

to_dict()#
class hpcflow.sdk.core.object_list.CommandFilesList(_objects)#

Bases: AppDataList

A list-like container for command files with dot-notation access by label.

class hpcflow.sdk.core.object_list.DotAccessObjectList(_objects, access_attribute, descriptor=None)#

Bases: ObjectList

Provide dot-notation access via an access attribute for the case where the access attribute uniquely identifies a single object.

add_object(obj, index=-1, skip_duplicates=False)#
add_objects(objs, index=-1, skip_duplicates=False)#
get(access_attribute_value=None, **kwargs)#

Get a single object from the object list, by specifying the value of the access attribute, and optionally additional keyword-argument attribute values.

get_all(access_attribute_value=None, **kwargs)#

Get one or more objects from the object list, by specifying the value of the access attribute, and optionally additional keyword-argument attribute values.

class hpcflow.sdk.core.object_list.EnvironmentsList(_objects)#

Bases: AppDataList

A list-like container for environments with dot-notation access by name.

class hpcflow.sdk.core.object_list.ExecutablesList(_objects)#

Bases: AppDataList

A list-like container for environment executables with dot-notation access by executable label.

environment = None#
class hpcflow.sdk.core.object_list.GroupList(_objects)#

Bases: AppDataList

A list-like container for the task schema group list with dot-notation access by group name.

class hpcflow.sdk.core.object_list.ObjectList(objects, descriptor=None)#

Bases: JSONLike

A list-like class that provides item access via a get method according to attributes or dict-keys.

add_object(obj, index=-1, skip_duplicates=False)#
get(**kwargs)#

Get a single object from the object list, by specifying the value of the access attribute, and optionally additional keyword-argument attribute values.

get_all(**kwargs)#

Get one or more objects from the object list, by specifying the value of the access attribute, and optionally additional keyword-argument attribute values.

list_attrs()#

Get a tuple of the unique access-attribute values of the constituent objects.

class hpcflow.sdk.core.object_list.ParametersList(_objects)#

Bases: AppDataList

A list-like container for parameters with dot-notation access by parameter type.

class hpcflow.sdk.core.object_list.ResourceList(_objects)#

Bases: ObjectList

property element_set#
to_json_like(dct=None, shared_data=None, exclude=None, path=None)#

Overridden to write out as a dict keyed by action scope (like as can be specified in the input YAML) instead of list.

property workflow_template#
class hpcflow.sdk.core.object_list.TaskList(_objects)#

Bases: AppDataList

A list-like container for a task-like list with dot-notation access by task unique-name.

class hpcflow.sdk.core.object_list.TaskSchemasList(_objects)#

Bases: AppDataList

A list-like container for a task schema list with dot-notation access by task schema unique-name.

class hpcflow.sdk.core.object_list.TaskTemplateList(_objects)#

Bases: AppDataList

A list-like container for a task-like list with dot-notation access by task unique-name.

class hpcflow.sdk.core.object_list.WorkflowLoopList(_objects)#

Bases: DotAccessObjectList

class hpcflow.sdk.core.object_list.WorkflowTaskList(_objects)#

Bases: DotAccessObjectList

add_object(obj, index=-1)#
hpcflow.sdk.core.object_list.index(obj_lst, obj)#

hpcflow.sdk.core.parameters module#

class hpcflow.sdk.core.parameters.AbstractInputValue#

Bases: JSONLike

Class to represent all sequence-able inputs to a task.

make_persistent(workflow, source)#

Save value to a persistent workflow.

Parameters:

workflow (Workflow) –

Returns:

String is the data path for this task input and single item integer list contains the index of the parameter data Zarr group where the data is stored.

Return type:

(str, list of int)

to_dict()#
property value#
property workflow#
class hpcflow.sdk.core.parameters.BuiltinSchemaParameter#

Bases: object

class hpcflow.sdk.core.parameters.InputSource(source_type, import_ref=None, task_ref=None, task_source_type=None, element_iters=None, path=None, where=None)#

Bases: JSONLike

classmethod default()#
classmethod from_json_like(json_like, shared_data=None)#
Parameters:
  • json_like

  • shared_data (dict of (str: ObjectList)) –

classmethod from_string(str_defn)#
get_task(workflow)#

If source_type is task, then return the referenced task from the given workflow.

classmethod import_(import_ref)#
is_in(other_input_sources)#

Check if this input source is in a list of other input sources, without considering the element_iters attribute.

Parameters:

other_input_sources (List[InputSource]) –

Return type:

None | int

classmethod local()#
classmethod task(task_ref, task_source_type=None, element_iters=None)#
to_string()#
class hpcflow.sdk.core.parameters.InputSourceType(value)#

Bases: Enum

An enumeration.

DEFAULT = 2#
IMPORT = 0#
LOCAL = 1#
TASK = 3#
class hpcflow.sdk.core.parameters.InputValue(parameter, value=None, value_class_method=None, path=None)#

Bases: AbstractInputValue

Parameters:
  • parameter (Union[Parameter, str]) –

  • value (Optional[Any]) –

  • value_class_method (Optional[str]) –

  • path (Optional[str]) –

classmethod from_json_like(json_like, shared_data=None)#
Parameters:
  • json_like

  • shared_data (dict of (str: ObjectList)) –

property is_sub_value#

True if the value is for a sub part of the parameter (i.e. if path is set). Sub-values are not added to the base parameter data, but are interpreted as single-value sequences.

property normalised_inputs_path#
property normalised_path#
class hpcflow.sdk.core.parameters.Parameter(typ: 'str', is_file: 'bool' = False, sub_parameters: 'List[SubParameter]' = <factory>, _value_class: 'Any' = None, _hash_value: 'Optional[str]' = None)#

Bases: JSONLike

Parameters:
  • typ (str) –

  • is_file (bool) –

  • sub_parameters (List[SubParameter]) –

  • _value_class (Any) –

  • _hash_value (str | None) –

is_file: bool = False#
sub_parameters: List[SubParameter]#
to_dict()#
typ: str#
class hpcflow.sdk.core.parameters.ParameterPath(path: 'Sequence[Union[str, int, float]]', task: 'Optional[Union[TaskTemplate, TaskSchema]]' = None)#

Bases: JSONLike

Parameters:
  • path (Sequence[Union[str, int, float]]) –

  • task (Optional[Union[TaskTemplate, TaskSchema]]) –

path: Sequence[str | int | float]#
task: TaskTemplate | TaskSchema | None = None#
class hpcflow.sdk.core.parameters.ParameterPropagationMode(value)#

Bases: Enum

An enumeration.

EXPLICIT = 1#
IMPLICIT = 0#
NEVER = 2#
class hpcflow.sdk.core.parameters.ParameterValue#

Bases: object

to_dict()#
class hpcflow.sdk.core.parameters.ResourceSpec(scope=None, scratch=None, num_cores=None, scheduler=None, shell=None, use_job_array=None, time_limit=None, scheduler_options=None, scheduler_args=None, shell_args=None, os_name=None)#

Bases: JSONLike

Parameters:
  • scope (ActionScope) –

  • scratch (Optional[str]) –

  • num_cores (Optional[int]) –

  • scheduler (Optional[str]) –

  • shell (Optional[str]) –

  • use_job_array (Optional[bool]) –

  • time_limit (Optional[Union[str, timedelta]]) –

  • scheduler_options (Optional[Dict]) –

  • scheduler_args (Optional[Dict]) –

  • shell_args (Optional[Dict]) –

  • os_name (Optional[str]) –

ALLOWED_PARAMETERS = {'num_cores', 'os_name', 'scheduler', 'scheduler_args', 'scheduler_options', 'scratch', 'shell', 'shell_args', 'time_limit', 'use_job_array'}#
property element_set#
make_persistent(workflow, source)#

Save to a persistent workflow.

Parameters:

workflow (Workflow) –

Returns:

String is the data path for this task input and integer list contains the indices of the parameter data Zarr groups where the data is stored.

Return type:

(str, list of int)

property normalised_path#
property normalised_resources_path#
property num_cores#
property os_name#
property scheduler#
property scheduler_args#
property scheduler_options#
property scratch#
property shell#
property shell_args#
property time_limit#
to_dict()#
property use_job_array#
property workflow#
property workflow_template#
class hpcflow.sdk.core.parameters.SchemaInput(parameter, default_value=None, propagation_mode=ParameterPropagationMode.IMPLICIT, group=None, where=None)#

Bases: SchemaParameter

A Parameter as used within a particular schema, for which a default value may be applied.

Parameters:
default_value: InputValue | None = None#
group: str | None = None#
property input_or_output#
parameter: Parameter#
propagation_mode: ParameterPropagationMode = 0#
property task_schema#
where: ElementFilter | None = None#
class hpcflow.sdk.core.parameters.SchemaOutput(parameter, propagation_mode=ParameterPropagationMode.IMPLICIT)#

Bases: SchemaParameter

A Parameter as outputted from particular task.

Parameters:
property input_or_output#
parameter: Parameter#
propagation_mode: ParameterPropagationMode = 0#
class hpcflow.sdk.core.parameters.SchemaParameter#

Bases: JSONLike

property name#
property typ#
class hpcflow.sdk.core.parameters.SubParameter(address: 'Address', parameter: 'Parameter')#

Bases: object

Parameters:
  • address (List[int | float | str]) –

  • parameter (Parameter) –

address: List[int | float | str]#
parameter: Parameter#
class hpcflow.sdk.core.parameters.TaskSourceType(value)#

Bases: Enum

An enumeration.

ANY = 2#
INPUT = 0#
OUTPUT = 1#
class hpcflow.sdk.core.parameters.ValuePerturbation(name: 'str', path: 'Optional[Sequence[Union[str, int, float]]]' = None, multiplicative_factor: 'Optional[Numeric]' = 1, additive_factor: 'Optional[Numeric]' = 0)#

Bases: AbstractInputValue

Parameters:
  • name (str) –

  • path (Sequence[str | int | float] | None) –

  • multiplicative_factor (int | float | number | None) –

  • additive_factor (int | float | number | None) –

additive_factor: int | float | number | None = 0#
classmethod from_spec(spec)#
multiplicative_factor: int | float | number | None = 1#
name: str#
path: Sequence[str | int | float] | None = None#
class hpcflow.sdk.core.parameters.ValueSequence(path, nesting_order, values, is_unused=False)#

Bases: JSONLike

Parameters:
  • path (str) –

  • nesting_order (int) –

  • values (List[Any]) –

  • is_unused (bool) –

classmethod from_linear_space(start, stop, nesting_order, num=50, path=None, **kwargs)#
classmethod from_range(start, stop, nesting_order, step=1, path=None)#
property input_path#
property input_type#
property is_sub_value#

True if the values are for a sub part of the parameter.

make_persistent(workflow, source)#

Save value to a persistent workflow.

Return type:

Tuple[str, List[int], bool]

property normalised_inputs_path#

Return the normalised path without the “inputs” prefix, if the sequence is an inputs sequence, else return None.

property normalised_path#
property parameter#
property path_split#
property path_type#
property resource_scope#
to_dict()#
property values#
property workflow#

hpcflow.sdk.core.task module#

class hpcflow.sdk.core.task.ElementPropagation(task, nesting_order=None)#

Bases: object

Class to represent how a newly added element set should propagate to a given downstream task.

Parameters:
  • task (Task) –

  • nesting_order (Dict | None) –

property element_set#
nesting_order: Dict | None = None#
task: Task#
class hpcflow.sdk.core.task.ElementSet(inputs=None, input_files=None, sequences=None, resources=None, repeats=1, input_sources=None, nesting_order=None, sourceable_elem_iters=None, allow_non_coincident_task_sources=False)#

Bases: JSONLike

Class to represent a parametrisation of a new set of elements.

Parameters:
  • inputs (Optional[List[InputValue]]) –

  • input_files (Optional[List[InputFile]]) –

  • sequences (Optional[List[ValueSequence]]) –

  • resources (Optional[Dict[str, Dict]]) –

  • repeats (Optional[Union[int, List[int]]]) –

  • input_sources (Optional[Dict[str, InputSource]]) –

  • nesting_order (Optional[List]) –

  • sourceable_elem_iters (Optional[List[int]]) –

  • allow_non_coincident_task_sources (Optional[bool]) –

property defined_input_types#
property elem_iter_global_indices#
property element_iterations#
property element_local_idx_range#

Used to retrieve elements belonging to this element set.

property elements#
classmethod ensure_element_sets(inputs=None, input_files=None, sequences=None, resources=None, repeats=None, input_sources=None, nesting_order=None, element_sets=None, sourceable_elem_iters=None)#
get_defined_parameter_types()#
get_defined_sub_parameter_types()#
get_locally_defined_inputs()#
get_sequence_by_path(path)#
get_sequence_from_path(sequence_path)#
get_task_dependencies(as_objects=False)#

Get upstream tasks that this element set depends on.

property index#
property input_types#
is_input_type_provided(typ)#

Check if an input is provided locally as an InputValue or a ValueSequence.

Parameters:

typ (str) –

Return type:

bool

prepare_persistent_copy()#

Return a copy of self, which will then be made persistent, and save copies of attributes that may be changed during integration with the workflow.

property task#
property task_template#
to_dict()#
property undefined_input_types#
class hpcflow.sdk.core.task.Elements(task)#

Bases: object

Parameters:

task (WorkflowTask) –

islice(start=None, end=None)#
Parameters:
  • start (int) –

  • end (int) –

Return type:

Iterator[Element]

property task#
class hpcflow.sdk.core.task.InputStatus(has_default, is_required, is_provided)#

Bases: object

Information about a given schema input and its parametrisation within an element set.

Parameters:
  • has_default (bool) – True if a default value is available.

  • is_required (bool) – True if the input is required by one or more actions. An input may not be required if it is only used in the generation of inputs files, and those input files are passed to the element set directly.

  • is_provided (bool) – True if the input is locally provided in the element set.

has_default: bool#
property is_extra#

Return True if the input is provided but not required.

is_provided: bool#
is_required: bool#
class hpcflow.sdk.core.task.Parameters(task: 'WorkflowTask', path: 'str', return_element_parameters: 'bool', raise_on_missing: 'Optional[bool]' = False, default: 'Optional[Any]' = None)#

Bases: object

Parameters:
  • task (WorkflowTask) –

  • path (str) –

  • return_element_parameters (bool) –

  • raise_on_missing (bool | None) –

  • default (Any | None) –

default: Any | None = None#
islice(start=None, end=None)#
path: str#
raise_on_missing: bool | None = False#
return_element_parameters: bool#
task: WorkflowTask#
class hpcflow.sdk.core.task.Task(schemas, repeats=None, resources=None, inputs=None, input_files=None, sequences=None, input_sources=None, nesting_order=None, element_sets=None, sourceable_elem_iters=None)#

Bases: JSONLike

Parametrisation of an isolated task for which a subset of input values are given “locally”. The remaining input values are expected to be satisfied by other tasks/imports in the workflow.

Parameters:
  • schemas (Union[TaskSchema, str, List[TaskSchema], List[str]]) –

  • repeats (Optional[Union[int, List[int]]]) –

  • resources (Optional[Dict[str, Dict]]) –

  • inputs (Optional[List[InputValue]]) –

  • input_files (Optional[List[InputFile]]) –

  • sequences (Optional[List[ValueSequence]]) –

  • input_sources (Optional[Dict[str, InputSource]]) –

  • nesting_order (Optional[List]) –

  • element_sets (Optional[List[ElementSet]]) –

  • sourceable_elem_iters (Optional[List[int]]) –

add_group(name, where, group_by_distinct)#
Parameters:
all_schema_actions()#
Return type:

Iterator[Tuple[int, Action]]

property all_schema_input_normalised_paths#
property all_schema_input_types#

Get the set of all schema input types (over all specified schemas).

property all_schema_inputs: Tuple[SchemaInput]#
property all_schema_output_types#

Get the set of all schema output types (over all specified schemas).

property all_schema_outputs: Tuple[SchemaOutput]#
property all_sequences_normalised_paths#
property all_sourced_normalised_paths#
property all_used_sequences_normalised_paths#
property defined_input_types#
property dir_name#

Artefact directory name.

property element_sets#
get_all_required_schema_inputs(element_set)#
get_available_task_input_sources(element_set, source_tasks=None)#

For each input parameter of this task, generate a list of possible input sources that derive from inputs or outputs of this and other provided tasks.

Note this only produces a subset of available input sources for each input parameter; other available input sources may exist from workflow imports.

Parameters:
Return type:

List[InputSource]

get_input_statuses(elem_set)#

Get a dict whose keys are normalised input paths (without the “inputs” prefix), and whose values are InputStatus objects.

Parameters:

elem_set (ElementSet) – The element set for which input statuses should be returned.

Return type:

Dict[str, InputStatus]

get_non_sub_parameter_input_values()#
get_param_provided_element_sets(typ)#

Get the element set indices of this task for which a specified parameter type is locally provided.

Parameters:

typ (str) –

Return type:

List[int]

get_schema_action(idx)#
get_sub_parameter_input_values()#
static get_task_unique_names(tasks)#

Get the unique name of each in a list of tasks.

Return type:

list of str

Parameters:

tasks (List[Task]) –

property index#
property insert_ID#
is_input_type_required(typ, element_set)#

Check if an given input type must be specified in the parametrisation of this element set.

A schema input need not be specified if it is only required to generate an input file, and that input file is passed directly.

Parameters:
Return type:

bool

property name#
property non_universal_input_types#

Get input types for each schema that are non-universal.

property num_all_schema_actions: int#
property num_element_sets#
property objective#
prepare_element_resolution(element_set, input_data_indices)#
property provides_parameters#
property schemas#
set_sequence_parameters(element_set)#
to_dict()#
to_persistent(workflow, insert_ID)#

Return a copy where any schema input defaults are saved to a persistent workflow. Element set data is not made persistent.

property undefined_input_types#
property undefined_inputs#
property universal_input_types#

Get input types that are associated with all schemas

property unsourced_inputs#

Get schema input types for which no input sources are currently specified.

class hpcflow.sdk.core.task.TaskInputParameters(task)#

Bases: object

For retrieving schema input parameters across all elements.

Parameters:

task (WorkflowTask) –

task: WorkflowTask#
class hpcflow.sdk.core.task.TaskOutputParameters(task)#

Bases: object

For retrieving schema output parameters across all elements.

Parameters:

task (WorkflowTask) –

task: WorkflowTask#
class hpcflow.sdk.core.task.WorkflowTask(workflow, template, index, num_elements, num_element_iterations, num_EARs)#

Bases: object

Class to represent a Task that is bound to a Workflow.

Parameters:
  • workflow (Workflow) –

  • template (Task) –

  • index (int) –

  • num_elements (int) –

  • num_element_iterations (int) –

  • num_EARs (int) –

add_elements(base_element=None, inputs=None, input_files=None, sequences=None, resources=None, repeats=None, input_sources=None, nesting_order=None, element_sets=None, sourceable_elem_iters=None, propagate_to=None, return_indices=False)#
property dir_name#
property downstream_tasks#

Get all workflow tasks that are downstream from this task.

property elements#
ensure_input_sources(element_set)#

Check valid input sources are specified for a new task to be added to the workflow in a given position. If none are specified, set them according to the default behaviour.

generate_new_elements(input_data_indices, output_data_indices, element_data_indices, sequence_indices, source_indices)#
get(path, raise_on_missing=False, default=None)#
get_all_element_iterations()#
get_dependent_elements(as_objects=False)#

Get elements from downstream tasks (ElementID or Element objects) that depend on this task.

Parameters:

as_objects (bool) –

Return type:

List[Union[ElementID, Element]]

get_dependent_tasks(as_objects=False)#

Get tasks (insert ID or WorkflowTask objects) that depends on this task.

Parameters:

as_objects (bool) –

Return type:

List[int | WorkflowTask]

get_dir_name(loop_idx=None)#
Parameters:

loop_idx (Dict[str, int] | None) –

Return type:

str

get_element_dependencies(as_objects=False)#

Get elements from upstream tasks (ElementID or Element objects) that this task depends on.

Parameters:

as_objects (bool) –

Return type:

List[Union[ElementID, Element]]

get_task_dependencies(as_objects=False)#

Get tasks (insert ID or WorkflowTask objects) that this task depends on.

Dependencies may come from either elements from upstream tasks, or from locally defined inputs/sequences/defaults from upstream tasks.

Parameters:

as_objects (bool) –

Return type:

List[int | WorkflowTask]

property index#
initialise_EARs()#

Try to initialise any uninitialised EARs of this task.

Return type:

List[int]

property inputs#
property insert_ID#
property name#
classmethod new_empty_task(workflow, template, index)#
Parameters:
  • workflow (Workflow) –

  • template (Task) –

  • index (int) –

property num_EARs#
property num_actions#
property num_element_iterations#
property num_element_sets#
property num_elements#
property outputs#
static resolve_element_data_indices(multiplicities)#

Find the index of the Zarr parameter group index list corresponding to each input data for all elements.

# TODO: update docstring; shouldn’t reference Zarr.

Parameters:

multiplicities (list of dict) –

Each list item represents a sequence of values with keys:

multiplicity: int nesting_order: int path : str

Returns:

element_dat_idx – Each list item is a dict representing a single task element and whose keys are input data paths and whose values are indices that index the values of the dict returned by the task.make_persistent method.

Return type:

list of dict

property template#
test_action_rule(act_rule, data_idx)#
Parameters:
Return type:

bool

property unique_name#
property upstream_tasks#

Get all workflow tasks that are upstream from this task.

property workflow#

hpcflow.sdk.core.task_schema module#

class hpcflow.sdk.core.task_schema.TaskObjective(name: str)#

Bases: JSONLike

Parameters:

name (str) –

name: str#
class hpcflow.sdk.core.task_schema.TaskSchema(objective, actions, method=None, implementation=None, inputs=None, outputs=None, version=None, _hash_value=None)#

Bases: JSONLike

Parameters:
classmethod get_by_key(key)#

Get a config-loaded task schema from a key.

get_key()#
get_parameter_dependence(parameter)#

Find if/where a given parameter is used by the schema’s actions.

Parameters:

parameter (SchemaParameter) –

classmethod ignore_invalid_actions()#
property input_types#
make_persistent(workflow, source)#
Return type:

List[int]

property name#
property output_types#
property provides_parameters#
property task_template#

hpcflow.sdk.core.test_utils module#

hpcflow.sdk.core.test_utils.make_actions(ins_outs)#
Parameters:

ins_outs (List[Tuple[Tuple | str, str]]) –

Return type:

List[Action]

hpcflow.sdk.core.test_utils.make_parameters(num)#
hpcflow.sdk.core.test_utils.make_schemas(ins_outs, ret_list=False)#
hpcflow.sdk.core.test_utils.make_tasks(schemas_spec, local_inputs=None, local_sequences=None, local_resources=None, nesting_orders=None)#
hpcflow.sdk.core.test_utils.make_workflow(schemas_spec, path, local_inputs=None, local_sequences=None, local_resources=None, nesting_orders=None, resources=None, name='w1', overwrite=False, store='zarr')#

hpcflow.sdk.core.utils module#

class hpcflow.sdk.core.utils.PrettyPrinter#

Bases: object

class hpcflow.sdk.core.utils.Singleton#

Bases: type

hpcflow.sdk.core.utils.bisect_slice(selection, len_A)#

Given two sequences (the first of which of known length), get the two slices that are equivalent to a given slice if the two sequences were combined.

Parameters:
  • selection (slice) –

  • len_A (int) –

hpcflow.sdk.core.utils.capitalise_first_letter(chars)#
hpcflow.sdk.core.utils.check_in_object_list(spec_name, spec_pos=1, obj_list_pos=2)#

Decorator factory for the various from_spec class methods that have attributes that should be replaced by an object from an object list.

hpcflow.sdk.core.utils.check_valid_py_identifier(name)#

Check a string is (roughly) a valid Python variable identifier and if so return it in lower-case.

Notes

Will be used for:
  • task objective name

  • task method

  • task implementation

  • parameter type

  • parameter name

  • loop name

  • element group name

class hpcflow.sdk.core.utils.classproperty(f)#

Bases: object

hpcflow.sdk.core.utils.ensure_in(item, lst)#

Get the index of an item in a list and append the item if it is not in the list.

hpcflow.sdk.core.utils.get_duplicate_items(lst)#

Get a list of all items in an iterable that appear more than once, assuming items are hashable.

Examples

>>> get_duplicate_items([1, 1, 2, 3])
[1]
>>> get_duplicate_items([1, 2, 3])
[]
>>> get_duplicate_items([1, 2, 3, 3, 3, 2])
[2, 3, 2]
hpcflow.sdk.core.utils.get_in_container(cont, path, cast_indices=False)#
hpcflow.sdk.core.utils.get_item_repeat_index(lst, distinguish_singular=False, item_callable=None)#

Get the repeat index for each item in a list.

Parameters:
  • lst (list) – Must contain hashable items, or hashable objects that are returned via callable called on each item.

  • distinguish_singular (bool, optional) – If True, items that are not repeated will have a repeat index of 0, and items that are repeated will have repeat indices starting from 1.

  • item_callable (callable, optional) – If specified, comparisons are made on the output of this callable on each item.

Returns:

repeat_idx – Repeat indices of each item (see distinguish_singular for details).

Return type:

list of int

hpcflow.sdk.core.utils.get_md5_hash(obj)#
hpcflow.sdk.core.utils.get_nested_indices(idx, size, nest_levels, raise_on_rollover=False)#

Generate the set of nested indices of length n that correspond to a global idx.

Examples

>>> for i in range(4**2): print(get_nest_index(i, nest_levels=2, size=4))
[0, 0]
[0, 1]
[0, 2]
[0, 3]
[1, 0]
[1, 1]
[1, 2]
[1, 3]
[2, 0]
[2, 1]
[2, 2]
[2, 3]
[3, 0]
[3, 1]
[3, 2]
[3, 3]
>>> for i in range(4**3): print(get_nested_indices(i, nest_levels=3, size=4))
[0, 0, 0]
[0, 0, 1]
[0, 0, 2]
[0, 0, 3]
[0, 1, 0]
   ...
[3, 2, 3]
[3, 3, 0]
[3, 3, 1]
[3, 3, 2]
[3, 3, 3]
hpcflow.sdk.core.utils.get_process_stamp()#
hpcflow.sdk.core.utils.get_relative_path(path1, path2)#

Get relative path components between two paths.

Parameters:
  • path1 (tuple of (str or int or float) of length N) –

  • path2 (tuple of (str or int or float) of length less than or equal to N) –

Returns:

relative_path – The path components in path1 that are not in path2.

Return type:

tuple of (str or int or float)

Raises:

ValueError – If the two paths do not share a common ancestor of path components, or if path2 is longer than path1.

Notes

This function behaves like a simplified PurePath(*path1).relative_to(PurePath(*path2)) from the pathlib module, but where path components can include non-strings.

Examples

>>> get_relative_path(('A', 'B', 'C'), ('A',))
('B', 'C')
>>> get_relative_path(('A', 'B'), ('A', 'B'))
()
hpcflow.sdk.core.utils.get_time_stamp()#
hpcflow.sdk.core.utils.group_by_dict_key_values(lst, *keys)#

Group a list of dicts according to specified equivalent key-values.

Parameters:
  • lst (list of dict) – The list of dicts to group together.

  • keys (tuple) – Dicts that have identical values for all of these keys will be grouped together into a sub-list.

Returns:

grouped

Return type:

list of list of dict

Examples

>>> group_by_dict_key_values([{'a': 1}, {'a': 2}, {'a': 1}], 'a')
[[{'a': 1}, {'a': 1}], [{'a': 2}]]
hpcflow.sdk.core.utils.list_to_dict(lst, exclude=None)#
hpcflow.sdk.core.utils.load_config(func)#

API function decorator to ensure the configuration has been loaded, and load if not.

hpcflow.sdk.core.utils.make_workflow_id()#
hpcflow.sdk.core.utils.read_JSON_file(path)#
hpcflow.sdk.core.utils.read_JSON_string(string)#
Parameters:

string (str) –

hpcflow.sdk.core.utils.read_YAML(loadable_yaml)#
hpcflow.sdk.core.utils.read_YAML_file(path)#
Parameters:

path (PathLike) –

hpcflow.sdk.core.utils.remove_ansi_escape_sequences(string)#
hpcflow.sdk.core.utils.replace_items(lst, start, end, repl)#

Replaced a range of items in a list with items in another list.

hpcflow.sdk.core.utils.search_dir_files_by_regex(pattern, group=0, directory='.')#
hpcflow.sdk.core.utils.sentry_wrap(name, transaction_op=None, span_op=None)#
hpcflow.sdk.core.utils.set_in_container(cont, path, value, ensure_path=False)#

hpcflow.sdk.core.validation module#

hpcflow.sdk.core.validation.get_schema(filename)#

Get a valida Schema object from the embedded data directory.

hpcflow.sdk.core.workflow module#

class hpcflow.sdk.core.workflow.Workflow(path)#

Bases: object

Class to represent a persistent hpcflow workflow.

Parameters:

path (PathLike) –

add_loop(loop, parent_loop_indices=None)#

Add a loop to a subset of workflow tasks.

Parameters:
  • loop (Loop) –

  • parent_loop_indices (Dict) –

Return type:

None

add_submission(JS_parallelism=None)#
Parameters:

JS_parallelism (Optional[bool]) –

Return type:

Submission

add_task(task, new_index=None)#
Parameters:
  • task (Task) –

  • new_index (int | None) –

Return type:

None

add_task_after(new_task, task_ref=None)#

Adds the given new_task after the task specified in task_ref.

Parameters:
  • new_task (Task, required.) –

  • task_ref (Task, optional.) – If no task_ref is given, the new task will be added at the end.

add_task_before(new_task, task_ref=None)#

Adds the given new_task before the task specified in task_ref.

Parameters:
  • new_task (Task, required.) –

  • task_ref (Task, optional.) – If no task_ref is given, the new task will be added at the beginning.

property artifacts_path#
batch_update(is_workflow_creation=False)#

A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.

Parameters:

is_workflow_creation (bool) –

Return type:

Iterator[None]

check_parameters_exist(indices)#
Parameters:

indices (int | List[int]) –

Return type:

bool | List[bool]

copy(path=None)#

Copy the workflow to a new path and return the copied workflow.

Return type:

Workflow

property creation_info#
delete()#
elements()#
Return type:

Iterator[Element]

classmethod from_JSON_file(JSON_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a JSON file.

Parameters:
  • JSON_path (PathLike) – The path to a workflow template in the JSON file format.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_JSON_string(JSON_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a JSON string.

Parameters:
  • JSON_str (PathLike) – The JSON string containing a workflow template parametrisation.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_YAML_file(YAML_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a YAML file.

Parameters:
  • YAML_path (PathLike) – The path to a workflow template in the YAML file format.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_YAML_string(YAML_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a YAML string.

Parameters:
  • YAML_str (PathLike) – The YAML string containing a workflow template parametrisation.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_file(template_path, template_format=None, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from either a YAML or JSON file, depending on the file extension.

Parameters:
  • template_path (PathLike) – The path to a template file in YAML or JSON format, and with a “.yml”, “.yaml”, or “.json” extension.

  • template_format (str | None) – If specified, one of “json” or “yaml”. This forces parsing from a particular format regardless of the file extension.

  • path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_template(template, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from a WorkflowTemplate object.

Parameters:
  • template (WorkflowTemplate) – The WorkflowTemplate object to make persistent.

  • path (PathLike | None) – The directory in which the workflow will be generated. The current directory if not specified.

  • name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.

  • overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (str | None) – The persistent store to use for this workflow.

  • ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

classmethod from_template_data(template_name, tasks=Field(name=None, type=None, default=<dataclasses._MISSING_TYPE object>, default_factory=<function Workflow.<lambda>>, init=True, repr=True, hash=None, compare=True, metadata=mappingproxy({}), kw_only=<dataclasses._MISSING_TYPE object>, _field_type=None), loops=Field(name=None, type=None, default=<dataclasses._MISSING_TYPE object>, default_factory=<function Workflow.<lambda>>, init=True, repr=True, hash=None, compare=True, metadata=mappingproxy({}), kw_only=<dataclasses._MISSING_TYPE object>, _field_type=None), resources=None, path=None, workflow_name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None)#

Generate from the data associated with a WorkflowTemplate object.

Parameters:
  • template_name (str) – Name of the new workflow template, from which the new workflow will be generated.

  • tasks (Optional[List[Task]]) – List of Task objects to add to the new workflow.

  • loops (Optional[List[Loop]]) – List of Loop objects to add to the new workflow.

  • resources (Optional[Dict[str, Dict]]) – Mapping of action scopes to resource requirements, to be applied to all element sets in the workflow. resources specified in an element set take precedence of those defined here for the whole workflow.

  • path (Optional[PathLike]) – The directory in which the workflow will be generated. The current directory if not specified.

  • workflow_name (Optional[str]) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified template_name will be used, in combination with a date-timestamp.

  • overwrite (Optional[bool]) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.

  • store (Optional[str]) – The persistent store to use for this workflow.

  • ts_fmt (Optional[str]) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.

  • ts_name_fmt (Optional[str]) – The datetime format to use when generating the workflow name, where it includes a timestamp.

Return type:

Workflow

get_EARs_from_IDs(indices)#

Return element action run objects from a list of five-tuples, representing the task insert ID, element index, iteration index, action index, and run index, respectively.

Parameters:

indices (List[EAR_ID]) –

Return type:

List[ElementActionRun]

get_all_parameter_data()#
Return type:

Dict[int, Any]

get_element_iterations_from_IDs(indices)#

Return element iteration objects from a list of three-tuples, representing the task insert ID, element index, and iteration index, respectively.

Parameters:

indices (List[IterationID]) –

Return type:

List[ElementIteration]

get_elements_from_IDs(indices)#

Return element objects from a list of two-tuples, representing the task insert ID, and element index, respectively.

Parameters:

indices (List[ElementID]) –

Return type:

List[Element]

get_iteration_task_pathway()#
get_parameter_data(index)#
Parameters:

index (int) –

Return type:

Tuple[bool, Any]

get_parameter_source(index)#
Parameters:

index (int) –

Return type:

Dict

get_task_elements(task, selection)#
Parameters:
  • task (Task) –

  • selection (slice) –

Return type:

List[Element]

get_task_elements_islice(task, selection)#
Parameters:
  • task (Task) –

  • selection (slice) –

Return type:

Iterator[Element]

get_task_unique_names(map_to_insert_ID=False)#

Return the unique names of all workflow tasks.

Parameters:

map_to_insert_ID (bool, optional) – If True, return a dict whose values are task insert IDs, otherwise return a list.

Return type:

List[str] | Dict[str, int]

is_parameter_set(index)#
Parameters:

index (int) –

Return type:

bool

property loops: WorkflowLoopList#
property name#

The workflow name may be different from the template name, as it includes the creation date-timestamp if generated.

property num_added_tasks: int#
property num_element_iterations: int#
property num_elements: int#
property num_loops: int#
property num_submissions: int#
property num_tasks: int#
rename(new_name)#
Parameters:

new_name (str) –

resolve_jobscripts()#
Return type:

List[Jobscript]

save_parameter(name, value, submission_idx, jobscript_idx, JS_element_idx, JS_action_idx)#
Parameters:
  • submission_idx (int) –

  • jobscript_idx (int) –

  • JS_element_idx (int) –

  • JS_action_idx (int) –

save_parameters(values, submission_idx, jobscript_idx, JS_element_idx, JS_action_idx)#

Save multiple parameters to a given EAR.

Parameters:
  • values (Dict) –

  • submission_idx (int) –

  • jobscript_idx (int) –

  • JS_element_idx (int) –

  • JS_action_idx (int) –

set_EAR_end(submission_idx, jobscript_idx, JS_element_idx, JS_action_idx)#

Set the end time on an EAR.

Parameters:
  • submission_idx (int) –

  • jobscript_idx (int) –

  • JS_element_idx (int) –

  • JS_action_idx (int) –

Return type:

None

set_EAR_start(submission_idx, jobscript_idx, JS_element_idx, JS_action_idx)#

Set the start time on an EAR.

Parameters:
  • submission_idx (int) –

  • jobscript_idx (int) –

  • JS_element_idx (int) –

  • JS_action_idx (int) –

Return type:

None

set_EAR_submission_indices(sub_idx, EAR_indices)#

Set the submission index on an EAR.

Parameters:
  • sub_idx (int) –

  • EAR_indices (Tuple[int, int, int, int]) –

Return type:

None

show_all_EAR_statuses()#
property store_format#
property submissions: List[Submission]#
property submissions_path#
submit(ignore_errors=False, JS_parallelism=None, print_stdout=False)#
Parameters:
  • ignore_errors (bool | None) –

  • JS_parallelism (bool | None) –

  • print_stdout (bool | None) –

Return type:

None

property task_artifacts_path#
property tasks: WorkflowTaskList#
property template: WorkflowTemplate#
property template_components: Dict#
property ts_fmt#
property ts_name_fmt#
write_commands(submission_idx, jobscript_idx, JS_element_idx, JS_action_idx)#

Write run-time commands for a given EAR.

Parameters:
  • submission_idx (int) –

  • jobscript_idx (int) –

  • JS_element_idx (int) –

  • JS_action_idx (int) –

Return type:

None

class hpcflow.sdk.core.workflow.WorkflowBlueprint(workflow_template)#

Bases: object

Pre-built workflow templates that are simpler to parametrise (e.g. fitting workflows).

Parameters:

workflow_template (WorkflowTemplate) –

workflow_template: WorkflowTemplate#
class hpcflow.sdk.core.workflow.WorkflowTemplate(name, tasks=<factory>, loops=<factory>, workflow=None, resources=None)#

Bases: JSONLike

Class to represent initial parametrisation of a hpcflow workflow, with limited validation logic.

Parameters:
  • name (str) –

  • tasks (Optional[List[Task]]) –

  • loops (Optional[List[Loop]]) –

  • workflow (Optional[Workflow]) –

  • resources (Optional[Dict[str, Dict]]) –

classmethod from_JSON_file(path)#

Load from a JSON file.

Parameters:

path (PathLike) – The path to the JSON file containing the workflow template parametrisation.

Return type:

WorkflowTemplate

classmethod from_JSON_string(string)#

Load from a JSON string.

Parameters:

string (str) – The JSON string containing the workflow template parametrisation.

Return type:

WorkflowTemplate

classmethod from_YAML_file(path)#

Load from a YAML file.

Parameters:

path (PathLike) – The path to the YAML file containing the workflow template parametrisation.

Return type:

WorkflowTemplate

classmethod from_YAML_string(string)#

Load from a YAML string.

Parameters:

string (str) – The YAML string containing the workflow template parametrisation.

Return type:

WorkflowTemplate

classmethod from_file(path, template_format='yaml')#

Load from either a YAML or JSON file, depending on the file extension.

Parameters:
  • path (PathLike) – The path to the file containing the workflow template parametrisation.

  • template_format (str | None) – The file format to expect at path. One of “json” or “yaml”, if specified. By default, “yaml”.

Return type:

WorkflowTemplate

loops: List[Loop] | None#
name: str#
resources: Dict[str, Dict] | None = None#
tasks: List[Task] | None#
workflow: Workflow | None = None#

hpcflow.sdk.core.zarr_io module#

class hpcflow.sdk.core.zarr_io.ZarrEncodable#

Bases: object

classmethod from_zarr(zarr_group, dataset_copy=False)#
to_dict()#
to_zarr(zarr_group)#
hpcflow.sdk.core.zarr_io.zarr_decode(param_data, arr_group, path=None, dataset_copy=False)#
Parameters:
  • param_data (None | Dict) –

  • arr_group (Group) –

hpcflow.sdk.core.zarr_io.zarr_encode(data, zarr_group, is_pending_add, is_set)#

Module contents#

Core programmatic models for hpcflow.