Tasks¶
Tasks are individial database actions that you can run, revert or check the status of. They build your dataset piece by piece, usually by:
- creating a new table and filling it with data 
- adding columns to an existing table and filling them with data 
In most cases, you can compose your data pipeline just out of 4 Builtin Tasks:
| Written in | Create Table | Add Columns | 
|---|---|---|
| SQL | ||
| Python | 
However, if you need a dynamically generated table, where the columns aren’t known in advance, or a task with multiple outputs, you may need to create a Custom Task.
Builtin Tasks¶
- class CreateTableSql¶
- Bases: - ralsei.task.base.TaskDef- Runs a - CREATE TABLEsql script- Variables passed to the template: - table,- view- Example - unnest.sql - CREATE TABLE {{table}}( id SERIAL PRIMARY KEY, name TEXT ); {%-split-%} INSERT INTO {{table}}(name) SELECT json_array_elements_text(json->'names') FROM {{sources}};- pipeline.py - "unnest": CreateTableSql( sql=Path("./unnest.sql").read_text(), table=Table("new_table"), locals={"sources": self.outputof("other")}, )- Note - You can use - ralsei.utils.folder()to find SQL files relative to current file- 
sql : str | list[str] = None¶
- Sql template strings - Individual statements must be either separated by - {%split%}tag or pre-split into a list
 - 
table : ralsei.types.Table = None¶
- Table being created 
 
- 
sql : str | list[str] = 
- class AddColumnsSql¶
- Bases: - ralsei.task.base.TaskDef- Adds the specified Columns to an existing Table and runs the SQL script to fill them with data - Variables passed to the template: - table
 Columns can be defined in the template itself, using- {% set columns = [...] %}- Example - postprocess.sql - {% set columns = [Column("name_upper", "TEXT")] -%} UPDATE {{table}} SET name_upper = UPPER(name);- pipeline.py - "postprocess": AddColumnsSql( sql=Path("./postprocess.sql").read_text(), table=Table("people"), )- Note - You can use - ralsei.utils.folder()to find SQL files relative to current file- 
sql : str | list[str] = None¶
- Sql template strings - Individual statements must be either separated by - {%split%}tag or pre-split into a list
 - 
table : ralsei.graph.Resolves[ralsei.types.Table] = None¶
- Table to add columns to - May be the output of another task 
 - 
columns : Optional[Sequence[ralsei.types.ColumnBase]] = None¶
- these column definitions take precedence over those defined in the template 
 
- 
sql : str | list[str] = 
- class MapToNewTable¶
- Bases: - ralsei.task.base.TaskDef- Applies the provided map function to a query result, mapping a single row to one or more rows in a new table - Variables passed to jinja: - table= - table
- source= - source_table
- is_done= - is_done_column(as- ralsei.types.Identifier)
 - Example - from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }- Table body is generated from all - columns,- INSERTstatement - only from- ralsei.types.ValueColumnBase
 - 
table : ralsei.types.Table = None¶
- The new table being created 
 - 
columns : Sequence[str | ralsei.types.ValueColumnBase] = None¶
- Columns (and constraints) that make up the table definition - Additionally, - ralsei.types.ValueColumnBase.valuefield is used for- INSERTstatement generation- strcolumns and- ralsei.types.ValueColumn’s type are passed through the jinja renderer
 - 
fn : ralsei.wrappers.OneToMany = None¶
- A generator function, mapping one row to many rows - If - id_fieldsargument is omitted, will try to infer the- id_fieldsfrom metadata left by- ralsei.wrappers.pop_id_fields()
 - 
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'¶
- Task-scoped context-manager arguments passed to - fn- Example - from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
 - 
select : Optional[str] = None¶
- The - SELECTstatement that generates rows passed to- fnas arguments- If not specified, - fnwill only run once with 0 arguments.
 - 
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] = None¶
- The table where the input rows come from - If not creating - is_done_column, you can leave it as- None.
 May be the output of another task.
 - 
is_done_column : Optional[str] = None¶
- Create a boolean column with the given name in - source_tablethat tracks which rows have been processed- If set, the task will commit after each successful run of - fn, allowing you to stop and resume from the same place.- Note - Make sure to include - WHERE NOT {{is_done}}in your- selectstatement
 - 
id_fields : Optional[list[ralsei.types.IdColumn]] = None¶
- Columns that uniquely identify a row in - source_table, so that you can update- is_done_column- This argument takes precedence over - id_fieldsinferred from- fn’s metadata
 
- class MapToNewColumns¶
- Bases: - ralsei.task.base.TaskDef- Applies the provided map function to a query result, saving outputs into new columns on the same row - Variables passed to jinja: - table= - table
- is_done= - is_done_column(as- ralsei.types.Identifier)
 - Example - import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }- 
table : ralsei.graph.Resolves[ralsei.types.Table] = None¶
- Table to add columns to - May be the output of another task 
 - 
columns : Sequence[ralsei.types.ValueColumnBase] = None¶
- List of new columns - Used for - ADD COLUMNand- UPDATEstatement generation.
 - 
fn : ralsei.wrappers.OneToOne = None¶
- Function that maps one row to values of the new columns in the same row - If - id_fieldsargument is omitted, will try to infer the- id_fieldsfrom metadata left by- ralsei.wrappers.pop_id_fields()
 - 
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'¶
- Task-scoped context-manager arguments passed to - fn- Example - from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
 - 
is_done_column : Optional[str] = None¶
- Create a boolean column with the given name in - tablethat tracks which rows have been processed- If set, the task will commit after each successful run of - fn, allowing you to stop and resume from the same place.- Note - Make sure to include - WHERE NOT {{is_done}}in your- selectstatement
 - 
id_fields : Optional[list[ralsei.types.IdColumn]] = None¶
- Columns that uniquely identify a row in - table, so that you can update- is_done_column- This argument takes precedence over - id_fieldsinferred from- fn’s metadata
 
Custom Task¶
To start with, all tasks consist of a TaskDef part -
a configuration object that stores the task arguments -
and a TaskImpl part that gets instantiated later.
from ralsei.task import TaskDef, TaskImpl
class LoadCsv(TaskDef):
    # Put task parameters here
    class Impl(TaskImpl):
        """Task implementation"""
dataclasses.dataclass() decorator is implicitly applied to any TaskDef descendant,
so you can just declare the parameters as class attributes:
from pathlib import Path
from ralsei.types import Table
class LoadCsv(TaskDef):
    table: Table # Target table
    path: Path
Then, initialize the Impl by implementing TaskImpl.prepare()
If you’re going to resolve dependencies with TaskImpl.resolve()
or render  templates with TaskImpl.env (unless they are dynamically generated),
you have to do it during this stage.
Additionally, you can save your rendered SQL statements into TaskImpl._scripts,
so that they can be viewed in the CLI.
class LoadCsv(TaskDef):
    ...
    class Impl(TaskImpl):
        def prepare(self, this: "LoadCsv"):
            self.__table = this.table
            self.__path = this.path
            self._scripts["Drop Table"] = self.__drop_sql = self.env.render_sql(
                "DROP TABLE IF EXISTS {{table}}",
                table=this.table
            )
Finally, implement TaskImpl ‘s abstract methods:
- abstract _run(conn: ralsei.connection.ConnectionEnvironment)
- Run the task 
 
- abstract _delete(conn: ralsei.connection.ConnectionEnvironment)
- Delete whatever - _run()has created
 
- abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool
- Check if task has already been done 
 
- abstract property output : Any
- Object created or modified by this task (usually a - ralsei.types.Table)- Used for resolving - ralsei.graph.Pipeline.outputof()
 
Here we are using pandas for dynamic table generation
from typing import Any
import pandas as pd
from ralsei.connection import ConnectionEnvironment
from ralsei import db_actions
class LoadCsv(TaskDef):
    class Impl(TaskImpl):
        ...
        def _run(self, conn: ConnectionEnvironment):
            pd.read_csv(self.__path).to_sql(
                self.__table.name,
                conn.sqlalchemy,
                schema=self.__table.schema
            )
        def _exists(self, conn: ConnectionEnvironment) -> bool:
            return db_actions.table_exists(conn, self.__table)
        @property
        def output(self) -> Any:
            return self.__table
        def _delete(self, conn: ConnectionEnvironment):
            conn.sqlalchemy.execute(self.__drop_sql)
In fact, since everything except _run() is identical
for table-creating tasks, you can use CreateTableTask as a base class,
reducing boilerplate. Just don’t forget to call CreateTableTask._prepare_table()
from within prepare()
import pandas as pd
from pathlib import Path
from ralsei.types import Table
from ralsei.task import TaskDef, CreateTableTask
from ralsei.connection import ConnectionEnvironment
class LoadCsv(TaskDef):
    table: Table
    path: Path
    class Impl(CreateTableTask):
        def prepare(self, this: "UploadCsv"):
            self._prepare_table(this.table)
            self.__path = this.path
        def _run(self, conn: ConnectionEnvironment):
            pd.read_csv(self.__path).to_sql(
                self._table.name,
                conn.sqlalchemy,
                schema=self._table.schema
            )
For tasks that add columns to an existing table, there’s an eqiuvalent AddColumnsTask