Tasks¶
Tasks are individial database actions that you can run, revert or check the status of. They build your dataset piece by piece, usually by:
creating a new table and filling it with data
adding columns to an existing table and filling them with data
In most cases, you can compose your data pipeline just out of 4 Builtin Tasks:
Written in |
Create Table |
Add Columns |
---|---|---|
SQL |
||
Python |
However, if you need a dynamically generated table, where the columns aren’t known in advance, or a task with multiple outputs, you may need to create a Custom Task.
Builtin Tasks¶
- class CreateTableSql¶
Bases:
ralsei.task.base.TaskDef
Runs a
CREATE TABLE
sql scriptVariables passed to the template:
table
,view
Example
unnest.sql
CREATE TABLE {{table}}( id SERIAL PRIMARY KEY, name TEXT ); {%-split-%} INSERT INTO {{table}}(name) SELECT json_array_elements_text(json->'names') FROM {{sources}};
pipeline.py
"unnest": CreateTableSql( sql=Path("./unnest.sql").read_text(), table=Table("new_table"), locals={"sources": self.outputof("other")}, )
Note
You can use
ralsei.utils.folder()
to find SQL files relative to current file-
sql : str | list[str] =
None
¶ Sql template strings
Individual statements must be either separated by
{%split%}
tag or pre-split into a list
-
table : ralsei.types.Table =
None
¶ Table being created
-
sql : str | list[str] =
- class AddColumnsSql¶
Bases:
ralsei.task.base.TaskDef
Adds the specified Columns to an existing Table and runs the SQL script to fill them with data
Variables passed to the template:
table
Columns can be defined in the template itself, using{% set columns = [...] %}
Example
postprocess.sql
{% set columns = [Column("name_upper", "TEXT")] -%} UPDATE {{table}} SET name_upper = UPPER(name);
pipeline.py
"postprocess": AddColumnsSql( sql=Path("./postprocess.sql").read_text(), table=Table("people"), )
Note
You can use
ralsei.utils.folder()
to find SQL files relative to current file-
sql : str | list[str] =
None
¶ Sql template strings
Individual statements must be either separated by
{%split%}
tag or pre-split into a list
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None
¶ Table to add columns to
May be the output of another task
-
columns : Optional[Sequence[ralsei.types.ColumnBase]] =
None
¶ these column definitions take precedence over those defined in the template
-
sql : str | list[str] =
- class MapToNewTable¶
Bases:
ralsei.task.base.TaskDef
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table
Variables passed to jinja:
table=
table
source=
source_table
is_done=
is_done_column
(asralsei.types.Identifier
)
Example
from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }
Table body is generated from all
columns
,INSERT
statement - only fromralsei.types.ValueColumnBase
-
table : ralsei.types.Table =
None
¶ The new table being created
-
columns : Sequence[str | ralsei.types.ValueColumnBase] =
None
¶ Columns (and constraints) that make up the table definition
Additionally,
ralsei.types.ValueColumnBase.value
field is used forINSERT
statement generationstr
columns andralsei.types.ValueColumn
’s type are passed through the jinja renderer
-
fn : ralsei.wrappers.OneToMany =
None
¶ A generator function, mapping one row to many rows
If
id_fields
argument is omitted, will try to infer theid_fields
from metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'
¶ Task-scoped context-manager arguments passed to
fn
Example
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
-
select : Optional[str] =
None
¶ The
SELECT
statement that generates rows passed tofn
as argumentsIf not specified,
fn
will only run once with 0 arguments.
-
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] =
None
¶ The table where the input rows come from
If not creating
is_done_column
, you can leave it asNone
.
May be the output of another task.
-
is_done_column : Optional[str] =
None
¶ Create a boolean column with the given name in
source_table
that tracks which rows have been processedIf set, the task will commit after each successful run of
fn
, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}
in yourselect
statement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None
¶ Columns that uniquely identify a row in
source_table
, so that you can updateis_done_column
This argument takes precedence over
id_fields
inferred fromfn
’s metadata
- class MapToNewColumns¶
Bases:
ralsei.task.base.TaskDef
Applies the provided map function to a query result, saving outputs into new columns on the same row
Variables passed to jinja:
table=
table
is_done=
is_done_column
(asralsei.types.Identifier
)
Example
import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None
¶ Table to add columns to
May be the output of another task
-
columns : Sequence[ralsei.types.ValueColumnBase] =
None
¶ List of new columns
Used for
ADD COLUMN
andUPDATE
statement generation.
-
fn : ralsei.wrappers.OneToOne =
None
¶ Function that maps one row to values of the new columns in the same row
If
id_fields
argument is omitted, will try to infer theid_fields
from metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'
¶ Task-scoped context-manager arguments passed to
fn
Example
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
-
is_done_column : Optional[str] =
None
¶ Create a boolean column with the given name in
table
that tracks which rows have been processedIf set, the task will commit after each successful run of
fn
, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}
in yourselect
statement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None
¶ Columns that uniquely identify a row in
table
, so that you can updateis_done_column
This argument takes precedence over
id_fields
inferred fromfn
’s metadata
Custom Task¶
To start with, all tasks consist of a TaskDef
part -
a configuration object that stores the task arguments -
and a TaskImpl
part that gets instantiated later.
from ralsei.task import TaskDef, TaskImpl
class LoadCsv(TaskDef):
# Put task parameters here
class Impl(TaskImpl):
"""Task implementation"""
dataclasses.dataclass()
decorator is implicitly applied to any TaskDef
descendant,
so you can just declare the parameters as class attributes:
from pathlib import Path
from ralsei.types import Table
class LoadCsv(TaskDef):
table: Table # Target table
path: Path
Then, initialize the Impl
by implementing TaskImpl.prepare()
If you’re going to resolve dependencies with TaskImpl.resolve()
or render templates with TaskImpl.env
(unless they are dynamically generated),
you have to do it during this stage.
Additionally, you can save your rendered SQL statements into TaskImpl._scripts
,
so that they can be viewed in the CLI.
class LoadCsv(TaskDef):
...
class Impl(TaskImpl):
def prepare(self, this: "LoadCsv"):
self.__table = this.table
self.__path = this.path
self._scripts["Drop Table"] = self.__drop_sql = self.env.render_sql(
"DROP TABLE IF EXISTS {{table}}",
table=this.table
)
Finally, implement TaskImpl
‘s abstract methods:
- abstract _run(conn: ralsei.connection.ConnectionEnvironment)
Run the task
- abstract _delete(conn: ralsei.connection.ConnectionEnvironment)
Delete whatever
_run()
has created
- abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool
Check if task has already been done
- abstract property output : Any
Object created or modified by this task (usually a
ralsei.types.Table
)Used for resolving
ralsei.graph.Pipeline.outputof()
Here we are using pandas for dynamic table generation
from typing import Any
import pandas as pd
from ralsei.connection import ConnectionEnvironment
from ralsei import db_actions
class LoadCsv(TaskDef):
class Impl(TaskImpl):
...
def _run(self, conn: ConnectionEnvironment):
pd.read_csv(self.__path).to_sql(
self.__table.name,
conn.sqlalchemy,
schema=self.__table.schema
)
def _exists(self, conn: ConnectionEnvironment) -> bool:
return db_actions.table_exists(conn, self.__table)
@property
def output(self) -> Any:
return self.__table
def _delete(self, conn: ConnectionEnvironment):
conn.sqlalchemy.execute(self.__drop_sql)
In fact, since everything except _run()
is identical
for table-creating tasks, you can use CreateTableTask
as a base class,
reducing boilerplate. Just don’t forget to call CreateTableTask._prepare_table()
from within prepare()
import pandas as pd
from pathlib import Path
from ralsei.types import Table
from ralsei.task import TaskDef, CreateTableTask
from ralsei.connection import ConnectionEnvironment
class LoadCsv(TaskDef):
table: Table
path: Path
class Impl(CreateTableTask):
def prepare(self, this: "UploadCsv"):
self._prepare_table(this.table)
self.__path = this.path
def _run(self, conn: ConnectionEnvironment):
pd.read_csv(self.__path).to_sql(
self._table.name,
conn.sqlalchemy,
schema=self._table.schema
)
For tasks that add columns to an existing table, there’s an eqiuvalent AddColumnsTask