Pipelines¶
In Pipeline.create_tasks()
you declare tasks
and their dependencies, that after resolution will become a ralsei.graph.DAG
:
class MyPipeline(Pipeline):
def create_tasks(self):
return {
"create": CreateTableSql(
table=Table("records"),
sql="""\
CREATE TABLE {{table}}(
id INTEGER PRIMARY KEY,
first_name TEXT,
last_name TEXT
)
{%split%}
INSERT INTO {{table}}(first_name, last_name)
VALUES ('Fyodor', 'Dostoevsky')"""
),
"update": AddColumnsSql(
table=self.outputof("create"),
columns=[Column("full_name")],
sql="""\
UPDATE {{table}}
SET full_name = first_name || ' ' || last_name"""
)
}
Dependency resolution¶
Here, self.outputof()
serves a double purpose:
it resolves to the output table of the "create"
task and signifies that "update"
depends on "create"
,
forming the following graph:
ralsei.graph.OutputOf
can be used in place of ralsei.types.Table
:
In task constructor arguments, where explicitly allowed
In SQL Templates, since everything that goes in a
{{ value }}
block is automatically resolved
In some cases you may depend two or more tasks that add columns to the same table, regardless of order:
Then, self.outputof("update1", "update2")
will resolve to Table("records")
and mark both of these tasks as dependencies.
Warning
When using outputof()
with multiple arguments,
all of them must resolve to the same table.
outputof("create", "update1")
will throw an error.
Nested pipelines¶
You can also nest one pipeline inside another by including it in the dictionary:
"process": PipelineNested()
Then, its tasks will start with the process.
prefix.
class PipelineMain(Pipeline):
def __init__(self):
self.nested = PipelineNested(self.outputof("load"))
def create_tasks(self):
return {
"load": CreateTableSql(
table=Table("people"),
sql=folder().joinpath("load_people.sql").read_text(),
),
"process": nested,
"export": CreateTableSql(
table=Table("export"),
sql=folder().joinpath("export_result.sql").read_text(),
locals={"stats": self.outputof("process.analyze")},
)
}
main = PipelineMain()
nested = main.nested
class PipelineNested(Pipeline):
def __init__(self, source_table: Resolves[Table])
self._source_table = source_table
def create_tasks(self):
return {
"download": MapToNewTable(
source_table=self._source_table,
select="SELECT person_id, url FROM {{source}}",
table=Table("pages", "tmp"),
columns=[
ValueColumn("person_id", "INT"),
ValueColumn("page_num", "INT"),
ValueColumn("json", "JSONB"),
],
fn=compose(download_pages, pop_id_fields("person_id")),
),
"analyze": MapToNewTable(
source_table=self.outputof("download"),
select="SELECT person_id, json FROM {{source}}",
table=Table("stats", "html"),
columns=[
ValueColumn("person_id", "INT"),
ValueColumn("score", "FLOAT"),
ValueColumn("rank", "INT"),
],
fn=compose(analyze_person, pop_id_fields("person_id")),
)
}
Note that Pipeline.outputof()
accepts a relative path from the pipeline’s root.
In the example above, main.outputof("process.analyze")
and nested.outputof("analyze")
refer to the same task.
Nested dictionaries¶
For the sake of convinience,
nested dictionaries are allowed in create_tasks()
:
return {
"group": {
"parse": MapToNewTable(...),
"analyze": MapToNewTable(...),
},
"other": CreateTableSql(...),
}
There is, however, no way to write outputof()
relative to a dictionary. You have to refer to those tasks using their full paths
(group.parse
, group.analyze
).