ralsei.graph¶
Package Contents¶
Classes¶
This is where you declare your tasks, that later get compiled into a |
|
A graph of tasks |
|
Tuple subclass representing a path in nested dictionary/pipeline |
|
Stores the relative path from the root of the pipeline to be resolved later |
|
Name and task pair |
|
An executable sequence of tasks |
Functions¶
If |
Aliases¶
A dictionary with task name to value pairs, used to define a |
|
Either the value |
API¶
- class ralsei.graph.Pipeline¶
Bases:
abc.ABCThis is where you declare your tasks, that later get compiled into a
ralsei.graph.DAG- abstract create_tasks() ralsei.graph.pipeline.Tasks¶
- Returns:¶
A dictionary with task name to value pairs, where the value can be:
A task definition (
ralsei.task.TaskDef)A nested
ralsei.graph.PipelineA nested dictionary
- outputof(*task_paths: str | ralsei.graph.path.TreePath) ralsei.graph.outputof.OutputOf¶
Refer to the output of another task from this pipeline, that will later be resolved.
Dependencies are taken into account when deciding the order of task execution.
- Parameters:¶
- *task_paths: str | ralsei.graph.path.TreePath¶
path from the root of the pipeline, either a string separated with
.or a TreePath objectMultiple paths are allowed, but all tasks must have the same output. This is useful when depending on multiple
AddColumnsSqltasks if both sets of columns are required
- build_dag(env: ralsei.jinja.SqlEnvironment) ralsei.graph.dag.DAG¶
Resolve dependencies and generate a graph of tasks
- class ralsei.graph.Tasks¶
- type Tasks = collections.abc.Mapping[str, ralsei.task.TaskDef | ralsei.graph.pipeline.Pipeline | ralsei.graph.pipeline.Tasks]
A dictionary with task name to value pairs, used to define a
PipelineAcceptable values:
A task definition (
ralsei.task.TaskDef)A nested
PipelineA nested dictionary
- class ralsei.graph.DAG¶
A graph of tasks
-
tasks : dict[ralsei.graph.path.TreePath, ralsei.task.Task] =
None¶ All tasks by name
-
relations : dict[ralsei.graph.path.TreePath, set[ralsei.graph.path.TreePath]] =
None¶ from -> torelations (left task is executed first)
- tasks_str() dict[str, ralsei.task.Task]¶
-
topological_sort(constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None =
None) ralsei.graph.sequence.TaskSequence¶ Topological sort
- Parameters:¶
- constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None =
None¶ If set, will filter out everything except these nodes and their descendants. Otherwise, perform topological sort on the whole graph
- constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None =
- sort_filtered(from_filters: collections.abc.Sequence[ralsei.graph.path.TreePath], single_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]) ralsei.graph.sequence.TaskSequence¶
Perform topological sort and apply a set of filters. See example in the CLI section.
Filters are combined as a union of both sets of tasks. If both filters are empty, returns the whole graph.
- Parameters:¶
- from_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]¶
same as
--fromin the CLI, means “this task and its descendants”- single_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]¶
same as
--onein the CLI, means “only this task”
- graphviz() graphviz.Digraph¶
Generate graphviz diagram
-
tasks : dict[ralsei.graph.path.TreePath, ralsei.task.Task] =
- class ralsei.graph.TreePath¶
-
Tuple subclass representing a path in nested dictionary/pipeline
Initialization
Initialize self. See help(type(self)) for accurate signature.
- static parse(string: str) ralsei.graph.path.TreePath¶
Parse from a
.separated string
- class ralsei.graph.OutputOf¶
Stores the relative path from the root of the pipeline to be resolved later
-
pipeline : ralsei.graph.pipeline.Pipeline =
None¶
-
task_paths : list[ralsei.graph.path.TreePath] =
None¶ More than one path is permitted, but but all tasks must have the same output. This is useful when depending on multiple
AddColumnsSqltasks if both sets of columns are required
- __post_init__()¶
-
pipeline : ralsei.graph.pipeline.Pipeline =
- class ralsei.graph.Resolves¶
- type Resolves = ralsei.graph.outputof.T | ralsei.graph.outputof.OutputOf
Either the value
Tor theOutputOfthat resolves to that value
- ralsei.graph.resolve(env: ralsei.jinja.ISqlEnvironment, value: Any) Any¶
If
valueisralsei.graph.OutputOf, resolve it. Otherwise, returnsvalueCan only be called from
ralsei.task.TaskImpl.prepare()
- exception ralsei.graph.ResolverContextError(*args: object)¶
Bases:
RuntimeErrorOccurs if
ralsei.graph.resolve()is called outside of dependency resolution contextInitialization
- exception ralsei.graph.CyclicGraphError(*args: object)¶
Bases:
RuntimeErrorOccurs if recursion is detected during dependency resolution
Initialization
- class ralsei.graph.NamedTask¶
Name and task pair
-
path : ralsei.graph.path.TreePath =
None¶
-
task : ralsei.task.Task =
None¶
-
path : ralsei.graph.path.TreePath =
- class ralsei.graph.TaskSequence(steps: list[ralsei.graph.sequence.NamedTask])¶
An executable sequence of tasks
Initialization
- run(conn: ralsei.connection.ConnectionExt)¶
Run, committing after each successful task
- delete(conn: ralsei.connection.ConnectionExt)¶
Delete, committing after each successful task
- redo(conn: ralsei.connection.ConnectionExt)¶