ralsei.graph

Package Contents

Classes

Pipeline

This is where you declare your tasks, that later get compiled into a ralsei.graph.DAG

DAG

A graph of tasks

TreePath

Tuple subclass representing a path in nested dictionary/pipeline

OutputOf

Stores the relative path from the root of the pipeline to be resolved later

NamedTask

Name and task pair

TaskSequence

An executable sequence of tasks

Functions

resolve

If value is ralsei.graph.OutputOf, resolve it. Otherwise, returns value

Aliases

Tasks

A dictionary with task name to value pairs, used to define a Pipeline

Resolves

Either the value T or the OutputOf that resolves to that value

API

class ralsei.graph.Pipeline

Bases: abc.ABC

This is where you declare your tasks, that later get compiled into a ralsei.graph.DAG

abstract create_tasks() ralsei.graph.pipeline.Tasks
Returns:

A dictionary with task name to value pairs, where the value can be:

outputof(*task_paths: str | ralsei.graph.path.TreePath) ralsei.graph.outputof.OutputOf

Refer to the output of another task from this pipeline, that will later be resolved.

Dependencies are taken into account when deciding the order of task execution.

Parameters:
*task_paths: str | ralsei.graph.path.TreePath

path from the root of the pipeline, either a string separated with . or a TreePath object

Multiple paths are allowed, but all tasks must have the same output. This is useful when depending on multiple AddColumnsSql tasks if both sets of columns are required

build_dag(env: ralsei.jinja.SqlEnvironment) ralsei.graph.dag.DAG

Resolve dependencies and generate a graph of tasks

class ralsei.graph.Tasks
type Tasks = collections.abc.Mapping[str, ralsei.task.TaskDef | ralsei.graph.pipeline.Pipeline | ralsei.graph.pipeline.Tasks]

A dictionary with task name to value pairs, used to define a Pipeline

Acceptable values:

class ralsei.graph.DAG

A graph of tasks

tasks : dict[ralsei.graph.path.TreePath, ralsei.task.Task] = None

All tasks by name

relations : dict[ralsei.graph.path.TreePath, set[ralsei.graph.path.TreePath]] = None

from -> to relations (left task is executed first)

tasks_str() dict[str, ralsei.task.Task]
relations_str() dict[str, set[str]]
topological_sort(constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None = None) ralsei.graph.sequence.TaskSequence

Topological sort

Parameters:
constrain_starting_nodes: collections.abc.Iterable[ralsei.graph.path.TreePath] | None = None

If set, will filter out everything except these nodes and their descendants. Otherwise, perform topological sort on the whole graph

sort_filtered(from_filters: collections.abc.Sequence[ralsei.graph.path.TreePath], single_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]) ralsei.graph.sequence.TaskSequence

Perform topological sort and apply a set of filters. See example in the CLI section.

Filters are combined as a union of both sets of tasks. If both filters are empty, returns the whole graph.

Parameters:
from_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]

same as --from in the CLI, means “this task and its descendants”

single_filters: collections.abc.Sequence[ralsei.graph.path.TreePath]

same as --one in the CLI, means “only this task”

graphviz() graphviz.Digraph

Generate graphviz diagram

class ralsei.graph.TreePath

Bases: tuple[str, ]

Tuple subclass representing a path in nested dictionary/pipeline

Initialization

Initialize self. See help(type(self)) for accurate signature.

__new__(*parts: str) Self
Parameters:
*parts: str

path elements, cannot contain .

static parse(string: str) ralsei.graph.path.TreePath

Parse from a . separated string

__str__() str
__repr__() str
class ralsei.graph.OutputOf

Stores the relative path from the root of the pipeline to be resolved later

pipeline : ralsei.graph.pipeline.Pipeline = None
task_paths : list[ralsei.graph.path.TreePath] = None

More than one path is permitted, but but all tasks must have the same output. This is useful when depending on multiple AddColumnsSql tasks if both sets of columns are required

__post_init__()
class ralsei.graph.Resolves
type Resolves = ralsei.graph.outputof.T | ralsei.graph.outputof.OutputOf

Either the value T or the OutputOf that resolves to that value

ralsei.graph.resolve(env: ralsei.jinja.ISqlEnvironment, value: Any) Any

If value is ralsei.graph.OutputOf, resolve it. Otherwise, returns value

Can only be called from ralsei.task.TaskImpl.prepare()

exception ralsei.graph.ResolverContextError(*args: object)

Bases: RuntimeError

Occurs if ralsei.graph.resolve() is called outside of dependency resolution context

Initialization

exception ralsei.graph.CyclicGraphError(*args: object)

Bases: RuntimeError

Occurs if recursion is detected during dependency resolution

Initialization

class ralsei.graph.NamedTask

Name and task pair

path : ralsei.graph.path.TreePath = None
task : ralsei.task.Task = None
property name : str

path as a string

class ralsei.graph.TaskSequence(steps: list[ralsei.graph.sequence.NamedTask])

An executable sequence of tasks

Initialization

run(conn: ralsei.connection.ConnectionExt)

Run, committing after each successful task

delete(conn: ralsei.connection.ConnectionExt)

Delete, committing after each successful task

redo(conn: ralsei.connection.ConnectionExt)

delete() + run()