ralsei.graph
¶
Package Contents¶
Classes¶
This is where you declare your tasks, that later get compiled into a |
|
A graph of tasks |
|
Tuple subclass representing a path in nested dictionary/pipeline |
|
Stores the relative path from the root of the pipeline to be resolved later |
|
Name and task pair |
|
An executable sequence of tasks |
Functions¶
If |
Aliases¶
A dictionary with task name to value pairs, used to define a |
|
Either the value |
API¶
- class ralsei.graph.Pipeline¶
Bases:
abc.ABC
This is where you declare your tasks, that later get compiled into a
ralsei.graph.DAG
- abstract create_tasks() ralsei.graph.pipeline.Tasks ¶
- Returns:¶
A dictionary with task name to value pairs, where the value can be:
A task definition (
ralsei.task.TaskDef
)A nested
ralsei.graph.Pipeline
A nested dictionary
- outputof(*task_paths: str | ralsei.graph.path.TreePath) ralsei.graph.outputof.OutputOf ¶
Refer to the output of another task from this pipeline, that will later be resolved.
Dependencies are taken into account when deciding the order of task execution.
- Parameters:¶
- *task_paths: str | ralsei.graph.path.TreePath¶
path from the root of the pipeline, either a string separated with
.
or a TreePath objectMultiple paths are allowed, but all tasks must have the same output. This is useful when depending on multiple
AddColumnsSql
tasks if both sets of columns are required
- build_dag(env: ralsei.jinja.SqlEnvironment) ralsei.graph.dag.DAG ¶
Resolve dependencies and generate a graph of tasks
- class ralsei.graph.Tasks¶
- type Tasks = collections.abc.Mapping[str, ralsei.task.TaskDef | ralsei.graph.pipeline.Pipeline | ralsei.graph.pipeline.Tasks]
A dictionary with task name to value pairs, used to define a
Pipeline
Acceptable values:
A task definition (
ralsei.task.TaskDef
)A nested
Pipeline
A nested dictionary
- class ralsei.graph.DAG¶
A graph of tasks
-
tasks : dict[ralsei.graph.path.TreePath, ralsei.task.Task] =
None
¶ All tasks by name
-
relations : dict[ralsei.graph.path.TreePath, set[ralsei.graph.path.TreePath]] =
None
¶ from -> to
relations (left task is executed first)
- tasks_str() dict[str, ralsei.task.Task] ¶
-
topological_sort(constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None =
None
) ralsei.graph.sequence.TaskSequence ¶ Topological sort
- Parameters:¶
- constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None =
None
¶ If set, will filter out everything except these nodes and their descendants. Otherwise, perform topological sort on the whole graph
- constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None =
- sort_filtered(from_filters: collections.abc.Sequence[ralsei.graph.path.TreePath], single_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]) ralsei.graph.sequence.TaskSequence ¶
Perform topological sort and apply a set of filters. See example in the CLI section.
Filters are combined as a union of both sets of tasks. If both filters are empty, returns the whole graph.
- Parameters:¶
- from_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]¶
same as
--from
in the CLI, means “this task and its descendants”- single_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]¶
same as
--one
in the CLI, means “only this task”
- graphviz() graphviz.Digraph ¶
Generate graphviz diagram
-
tasks : dict[ralsei.graph.path.TreePath, ralsei.task.Task] =
- class ralsei.graph.TreePath¶
-
Tuple subclass representing a path in nested dictionary/pipeline
Initialization
Initialize self. See help(type(self)) for accurate signature.
- static parse(string: str) ralsei.graph.path.TreePath ¶
Parse from a
.
separated string
- class ralsei.graph.OutputOf¶
Stores the relative path from the root of the pipeline to be resolved later
-
pipeline : ralsei.graph.pipeline.Pipeline =
None
¶
-
task_paths : list[ralsei.graph.path.TreePath] =
None
¶ More than one path is permitted, but but all tasks must have the same output. This is useful when depending on multiple
AddColumnsSql
tasks if both sets of columns are required
- __post_init__()¶
-
pipeline : ralsei.graph.pipeline.Pipeline =
- class ralsei.graph.Resolves¶
- type Resolves = ralsei.graph.outputof.T | ralsei.graph.outputof.OutputOf
Either the value
T
or theOutputOf
that resolves to that value
- ralsei.graph.resolve(env: ralsei.jinja.ISqlEnvironment, value: Any) Any ¶
If
value
isralsei.graph.OutputOf
, resolve it. Otherwise, returnsvalue
Can only be called from
ralsei.task.TaskImpl.prepare()
- exception ralsei.graph.ResolverContextError(*args: object)¶
Bases:
RuntimeError
Occurs if
ralsei.graph.resolve()
is called outside of dependency resolution contextInitialization
- exception ralsei.graph.CyclicGraphError(*args: object)¶
Bases:
RuntimeError
Occurs if recursion is detected during dependency resolution
Initialization
- class ralsei.graph.NamedTask¶
Name and task pair
-
path : ralsei.graph.path.TreePath =
None
¶
-
task : ralsei.task.Task =
None
¶
-
path : ralsei.graph.path.TreePath =
- class ralsei.graph.TaskSequence(steps: list[ralsei.graph.sequence.NamedTask])¶
An executable sequence of tasks
Initialization
- run(conn: ralsei.connection.ConnectionExt)¶
Run, committing after each successful task
- delete(conn: ralsei.connection.ConnectionExt)¶
Delete, committing after each successful task
- redo(conn: ralsei.connection.ConnectionExt)¶