Pipelines

In Pipeline.create_tasks() you declare tasks and their dependencies, that after resolution will become a ralsei.graph.DAG:

class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "create": CreateTableSql(
                table=Table("records"),
                sql="""\
                CREATE TABLE {{table}}(
                    id INTEGER PRIMARY KEY,
                    first_name TEXT,
                    last_name TEXT
                )
                {%split%}
                INSERT INTO {{table}}(first_name, last_name)
                VALUES ('Fyodor', 'Dostoevsky')"""
            ),
            "update": AddColumnsSql(
                table=self.outputof("create"),
                columns=[Column("full_name")],
                sql="""\
                UPDATE {{table}}
                SET full_name = first_name || ' ' || last_name"""
            )
        }

Dependency resolution

Here, self.outputof() serves a double purpose: it resolves to the output table of the "create" task and signifies that "update" depends on "create", forming the following graph:

%3 create create CREATE TABLE "records"(    id INTEGER PRIMARY KEY,    first_name TEXT,    last_name TEXT ) update update ALTER TABLE "records" ADD COLUMN "full_name" TEXT create->update

ralsei.graph.OutputOf can be used in place of ralsei.types.Table:

  • In task constructor arguments, where explicitly allowed

  • In SQL Templates, since everything that goes in a {{ value }} block is automatically resolved

In some cases you may depend two or more tasks that add columns to the same table, regardless of order:

%3 create create CREATE TABLE "records"(    ... ) update1 update1 ALTER TABLE "records" ADD COLUMN "full_name" TEXT create->update1 update2 update2 ALTER TABLE "records" ADD COLUMN "rank" INT create->update2 other other ... update1->other update2->other

Then, self.outputof("update1", "update2") will resolve to Table("records") and mark both of these tasks as dependencies.

Warning

When using outputof() with multiple arguments, all of them must resolve to the same table.
outputof("create", "update1") will throw an error.

Nested pipelines

You can also nest one pipeline inside another by including it in the dictionary:

"process": PipelineNested()

Then, its tasks will start with the process. prefix.

%3 nested load load download process.download load->download analyze process.analyze download->analyze export export analyze->export
class PipelineMain(Pipeline):
    def __init__(self):
        self.nested = PipelineNested(self.outputof("load"))

    def create_tasks(self):
        return {
            "load": CreateTableSql(
                table=Table("people"),
                sql=folder().joinpath("load_people.sql").read_text(),
            ),
            "process": nested,
            "export": CreateTableSql(
                table=Table("export"),
                sql=folder().joinpath("export_result.sql").read_text(),
                locals={"stats": self.outputof("process.analyze")},
            )
        }

main = PipelineMain()
nested = main.nested
class PipelineNested(Pipeline):
    def __init__(self, source_table: Resolves[Table])
        self._source_table = source_table

    def create_tasks(self):
        return {
            "download": MapToNewTable(
                source_table=self._source_table,
                select="SELECT person_id, url FROM {{source}}",
                table=Table("pages", "tmp"),
                columns=[
                    ValueColumn("person_id", "INT"),
                    ValueColumn("page_num", "INT"),
                    ValueColumn("json", "JSONB"),
                ],
                fn=compose(download_pages, pop_id_fields("person_id")),
            ),
            "analyze": MapToNewTable(
                source_table=self.outputof("download"),
                select="SELECT person_id, json FROM {{source}}",
                table=Table("stats", "html"),
                columns=[
                    ValueColumn("person_id", "INT"),
                    ValueColumn("score", "FLOAT"),
                    ValueColumn("rank", "INT"),
                ],
                fn=compose(analyze_person, pop_id_fields("person_id")),
            )
        }

Note that Pipeline.outputof() accepts a relative path from the pipeline’s root.

In the example above, main.outputof("process.analyze") and nested.outputof("analyze") refer to the same task.

Nested dictionaries

For the sake of convinience, nested dictionaries are allowed in create_tasks():

return {
    "group": {
        "parse": MapToNewTable(...),
        "analyze": MapToNewTable(...),
    },
    "other": CreateTableSql(...),
}

There is, however, no way to write outputof() relative to a dictionary. You have to refer to those tasks using their full paths (group.parse, group.analyze).