Tasks¶
Tasks are individial database actions that you can run, revert or check the status of. They build your dataset piece by piece, usually by:
creating a new table and filling it with data
adding columns to an existing table and filling them with data
In most cases, you can compose your data pipeline just out of 4 Builtin Tasks:
Written in |
Create Table |
Add Columns |
|---|---|---|
SQL |
||
Python |
However, if you need a dynamically generated table, where the columns aren’t known in advance, or a task with multiple outputs, you may need to create a Custom Task.
Builtin Tasks¶
- class CreateTableSql¶
Bases:
ralsei.task.base.TaskDefRuns a
CREATE TABLEsql scriptVariables passed to the template:
table,viewExample
unnest.sql
CREATE TABLE {{table}}( id SERIAL PRIMARY KEY, name TEXT ); {%-split-%} INSERT INTO {{table}}(name) SELECT json_array_elements_text(json->'names') FROM {{sources}};pipeline.py
"unnest": CreateTableSql( sql=Path("./unnest.sql").read_text(), table=Table("new_table"), locals={"sources": self.outputof("other")}, )Note
You can use
ralsei.utils.folder()to find SQL files relative to current file-
sql : str | list[str] =
None¶ Sql template strings
Individual statements must be either separated by
{%split%}tag or pre-split into a list
-
table : ralsei.types.Table =
None¶ Table being created
-
sql : str | list[str] =
- class AddColumnsSql¶
Bases:
ralsei.task.base.TaskDefAdds the specified Columns to an existing Table and runs the SQL script to fill them with data
Variables passed to the template:
table
Columns can be defined in the template itself, using{% set columns = [...] %}Example
postprocess.sql
{% set columns = [Column("name_upper", "TEXT")] -%} UPDATE {{table}} SET name_upper = UPPER(name);pipeline.py
"postprocess": AddColumnsSql( sql=Path("./postprocess.sql").read_text(), table=Table("people"), )Note
You can use
ralsei.utils.folder()to find SQL files relative to current file-
sql : str | list[str] =
None¶ Sql template strings
Individual statements must be either separated by
{%split%}tag or pre-split into a list
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None¶ Table to add columns to
May be the output of another task
-
columns : Optional[Sequence[ralsei.types.ColumnBase]] =
None¶ these column definitions take precedence over those defined in the template
-
sql : str | list[str] =
- class MapToNewTable¶
Bases:
ralsei.task.base.TaskDefApplies the provided map function to a query result, mapping a single row to one or more rows in a new table
Variables passed to jinja:
table=
tablesource=
source_tableis_done=
is_done_column(asralsei.types.Identifier)
Example
from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }Table body is generated from all
columns,INSERTstatement - only fromralsei.types.ValueColumnBase
-
table : ralsei.types.Table =
None¶ The new table being created
-
columns : Sequence[str | ralsei.types.ValueColumnBase] =
None¶ Columns (and constraints) that make up the table definition
Additionally,
ralsei.types.ValueColumnBase.valuefield is used forINSERTstatement generationstrcolumns andralsei.types.ValueColumn’s type are passed through the jinja renderer
-
fn : ralsei.wrappers.OneToMany =
None¶ A generator function, mapping one row to many rows
If
id_fieldsargument is omitted, will try to infer theid_fieldsfrom metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'¶ Task-scoped context-manager arguments passed to
fnExample
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
-
select : Optional[str] =
None¶ The
SELECTstatement that generates rows passed tofnas argumentsIf not specified,
fnwill only run once with 0 arguments.
-
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] =
None¶ The table where the input rows come from
If not creating
is_done_column, you can leave it asNone.
May be the output of another task.
-
is_done_column : Optional[str] =
None¶ Create a boolean column with the given name in
source_tablethat tracks which rows have been processedIf set, the task will commit after each successful run of
fn, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}in yourselectstatement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None¶ Columns that uniquely identify a row in
source_table, so that you can updateis_done_columnThis argument takes precedence over
id_fieldsinferred fromfn’s metadata
- class MapToNewColumns¶
Bases:
ralsei.task.base.TaskDefApplies the provided map function to a query result, saving outputs into new columns on the same row
Variables passed to jinja:
table=
tableis_done=
is_done_column(asralsei.types.Identifier)
Example
import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None¶ Table to add columns to
May be the output of another task
-
columns : Sequence[ralsei.types.ValueColumnBase] =
None¶ List of new columns
Used for
ADD COLUMNandUPDATEstatement generation.
-
fn : ralsei.wrappers.OneToOne =
None¶ Function that maps one row to values of the new columns in the same row
If
id_fieldsargument is omitted, will try to infer theid_fieldsfrom metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'¶ Task-scoped context-manager arguments passed to
fnExample
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
-
is_done_column : Optional[str] =
None¶ Create a boolean column with the given name in
tablethat tracks which rows have been processedIf set, the task will commit after each successful run of
fn, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}in yourselectstatement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None¶ Columns that uniquely identify a row in
table, so that you can updateis_done_columnThis argument takes precedence over
id_fieldsinferred fromfn’s metadata
Custom Task¶
To start with, all tasks consist of a TaskDef part -
a configuration object that stores the task arguments -
and a TaskImpl part that gets instantiated later.
from ralsei.task import TaskDef, TaskImpl
class LoadCsv(TaskDef):
# Put task parameters here
class Impl(TaskImpl):
"""Task implementation"""
dataclasses.dataclass() decorator is implicitly applied to any TaskDef descendant,
so you can just declare the parameters as class attributes:
from pathlib import Path
from ralsei.types import Table
class LoadCsv(TaskDef):
table: Table # Target table
path: Path
Then, initialize the Impl by implementing TaskImpl.prepare()
If you’re going to resolve dependencies with TaskImpl.resolve()
or render templates with TaskImpl.env (unless they are dynamically generated),
you have to do it during this stage.
Additionally, you can save your rendered SQL statements into TaskImpl._scripts,
so that they can be viewed in the CLI.
class LoadCsv(TaskDef):
...
class Impl(TaskImpl):
def prepare(self, this: "LoadCsv"):
self.__table = this.table
self.__path = this.path
self._scripts["Drop Table"] = self.__drop_sql = self.env.render_sql(
"DROP TABLE IF EXISTS {{table}}",
table=this.table
)
Finally, implement TaskImpl ‘s abstract methods:
- abstract _run(conn: ralsei.connection.ConnectionEnvironment)
Run the task
- abstract _delete(conn: ralsei.connection.ConnectionEnvironment)
Delete whatever
_run()has created
- abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool
Check if task has already been done
- abstract property output : Any
Object created or modified by this task (usually a
ralsei.types.Table)Used for resolving
ralsei.graph.Pipeline.outputof()
Here we are using pandas for dynamic table generation
from typing import Any
import pandas as pd
from ralsei.connection import ConnectionEnvironment
from ralsei import db_actions
class LoadCsv(TaskDef):
class Impl(TaskImpl):
...
def _run(self, conn: ConnectionEnvironment):
pd.read_csv(self.__path).to_sql(
self.__table.name,
conn.sqlalchemy,
schema=self.__table.schema
)
def _exists(self, conn: ConnectionEnvironment) -> bool:
return db_actions.table_exists(conn, self.__table)
@property
def output(self) -> Any:
return self.__table
def _delete(self, conn: ConnectionEnvironment):
conn.sqlalchemy.execute(self.__drop_sql)
In fact, since everything except _run() is identical
for table-creating tasks, you can use CreateTableTask as a base class,
reducing boilerplate. Just don’t forget to call CreateTableTask._prepare_table()
from within prepare()
import pandas as pd
from pathlib import Path
from ralsei.types import Table
from ralsei.task import TaskDef, CreateTableTask
from ralsei.connection import ConnectionEnvironment
class LoadCsv(TaskDef):
table: Table
path: Path
class Impl(CreateTableTask):
def prepare(self, this: "UploadCsv"):
self._prepare_table(this.table)
self.__path = this.path
def _run(self, conn: ConnectionEnvironment):
pd.read_csv(self.__path).to_sql(
self._table.name,
conn.sqlalchemy,
schema=self._table.schema
)
For tasks that add columns to an existing table, there’s an eqiuvalent AddColumnsTask