Tasks

Tasks are individial database actions that you can run, revert or check the status of. They build your dataset piece by piece, usually by:

  • creating a new table and filling it with data

  • adding columns to an existing table and filling them with data

In most cases, you can compose your data pipeline just out of 4 Builtin Tasks:

Written in

Create Table

Add Columns

SQL

CreateTableSql

AddColumnsSql

Python

MapToNewTable

MapToNewColumns

However, if you need a dynamically generated table, where the columns aren’t known in advance, or a task with multiple outputs, you may need to create a Custom Task.

Builtin Tasks

class CreateTableSql

Bases: ralsei.task.base.TaskDef

Runs a CREATE TABLE sql script

Variables passed to the template: table, view

Example

unnest.sql

CREATE TABLE {{table}}(
    id SERIAL PRIMARY KEY,
    name TEXT
);
{%-split-%}
INSERT INTO {{table}}(name)
SELECT json_array_elements_text(json->'names')
FROM {{sources}};

pipeline.py

"unnest": CreateTableSql(
    sql=Path("./unnest.sql").read_text(),
    table=Table("new_table"),
    locals={"sources": self.outputof("other")},
)

Note

You can use ralsei.utils.folder() to find SQL files relative to current file

sql : str | list[str] = None

Sql template strings

Individual statements must be either separated by {%split%} tag or pre-split into a list

table : ralsei.types.Table = None

Table being created

view : bool = False

whether this is a VIEW instead of a TABLE

class AddColumnsSql

Bases: ralsei.task.base.TaskDef

Adds the specified Columns to an existing Table and runs the SQL script to fill them with data

Variables passed to the template: table
Columns can be defined in the template itself, using {% set columns = [...] %}

Example

postprocess.sql

{% set columns = [Column("name_upper", "TEXT")] -%}

UPDATE {{table}}
SET name_upper = UPPER(name);

pipeline.py

"postprocess": AddColumnsSql(
    sql=Path("./postprocess.sql").read_text(),
    table=Table("people"),
)

Note

You can use ralsei.utils.folder() to find SQL files relative to current file

sql : str | list[str] = None

Sql template strings

Individual statements must be either separated by {%split%} tag or pre-split into a list

table : ralsei.graph.Resolves[ralsei.types.Table] = None

Table to add columns to

May be the output of another task

columns : Optional[Sequence[ralsei.types.ColumnBase]] = None

these column definitions take precedence over those defined in the template

class MapToNewTable

Bases: ralsei.task.base.TaskDef

Applies the provided map function to a query result, mapping a single row to one or more rows in a new table

Variables passed to jinja:

Example

from parsel import Selector
from ralsei import (
    Pipeline,
    MapToNewTable,
    Table,
    ValueColumn,
    Placeholder,
    compose,
    add_to_input,
    pop_id_fields,
)

# Find subjects on the hub page
def find_subjects(hub_url: str):
    html = download(hub_url)
    sel = Selector(html)

    for row in sel.xpath("//table/tr"):
        yield {
            "subject": row.xpath("a/text()").get(),
            "url": row.xpath("a/@href").get()
        }

# Download all pages in a subject rating
def download_pages(url: str):
    next_url = url
    page = 1

    while next_url is not None:
        html = download(next_url)
        yield { "page": page, "html": html }

        sel = Selector(html)
        next_url = sel.xpath("//a[@id = 'next']").get()
        page += 1


class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "subjects": MapToNewTable(
                table=Table("subjects"),
                columns=[ # (1)
                    "id SERIAL PRIMARY KEY",
                    ValueColumn("subject", "TEXT"),
                    ValueColumn("url", "TEXT"),
                ],
                fn=compose(
                    find_subjects,
                    add_to_input(hub_url="https://rating.com/2022")
                )
            ),
            "pages": MapToNewTable(
                source_table=self.outputof("subjects"),
                select="""\
                SELECT id, url FROM {{source}}
                WHERE NOT {{is_done}}""",
                table=Table("pages"),
                columns=[
                    ValueColumn(
                        "subject_id",
                        "INT REFERENCES {{source}}(id)",
                        Placeholder("id")
                    ),
                    ValueColumn("page", "INT"),
                    ValueColumn("html", "TEXT"),
                    "date_downloaded DATE DEFAULT NOW()",
                ],
                is_done_column="__downloaded",
                fn=compose(download_pages, pop_id_fields("id"))
            )
        }
  1. Table body is generated from all columns, INSERT statement - only from ralsei.types.ValueColumnBase

table : ralsei.types.Table = None

The new table being created

columns : Sequence[str | ralsei.types.ValueColumnBase] = None

Columns (and constraints) that make up the table definition

Additionally, ralsei.types.ValueColumnBase.value field is used for INSERT statement generation

str columns and ralsei.types.ValueColumn’s type are passed through the jinja renderer

fn : ralsei.wrappers.OneToMany = None

A generator function, mapping one row to many rows

If id_fields argument is omitted, will try to infer the id_fields from metadata left by ralsei.wrappers.pop_id_fields()

context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'

Task-scoped context-manager arguments passed to fn

Example

from ralsei.contextmanagers import reusable_contextmanager_const
from selenium import webdriver

@reusable_contextmanager_const
def browser_context():
    browser = webdriver.Chrome()
    yield browser
    browser.quit()

def scrape_page(browser: webdriver.Chrome):
    ...

MapToNewTable(
    fn=scrape_page,
    context={"browser": browser_context}
)
select : Optional[str] = None

The SELECT statement that generates rows passed to fn as arguments

If not specified, fn will only run once with 0 arguments.

source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] = None

The table where the input rows come from

If not creating is_done_column, you can leave it as None.
May be the output of another task.

is_done_column : Optional[str] = None

Create a boolean column with the given name in source_table that tracks which rows have been processed

If set, the task will commit after each successful run of fn, allowing you to stop and resume from the same place.

Note

Make sure to include WHERE NOT {{is_done}} in your select statement

id_fields : Optional[list[ralsei.types.IdColumn]] = None

Columns that uniquely identify a row in source_table, so that you can update is_done_column

This argument takes precedence over id_fields inferred from fn’s metadata

class MapToNewColumns

Bases: ralsei.task.base.TaskDef

Applies the provided map function to a query result, saving outputs into new columns on the same row

Variables passed to jinja:

Example

import requests
from parsel import Selector
from ralsei import (
    Pipeline,
    MapToNewColumns,
    Table,
    ValueColumn,
    Sql,
    compose_one,
    pop_id_fields,
)

def download(url: str):
    response = requests.get(url)
    response.raise_for_status()
    return {"html": response.text}

def parse(html: str):
    sel = Selector(html)
    return {
        "title": sel.xpath("//h1/text()").get(),
        "rating": sel.xpath("//div[@id='rating']/text()").get(),
    }


class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "download": MapToNewColumns(
                table=Table("pages"),
                select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}",
                columns=[
                    ValueColumn("html", "TEXT"),
                    ValueColumn("date_downloaded", "DATE", Sql("NOW()")),
                ],
                is_done_column="__downloaded",
                fn=compose_one(download, pop_id_fields("id")),
            ),
            "parse": MapToNewColumns(
                table=self.outputof("download"),
                select="SELECT id, html FROM {{table}}",
                columns=[
                    ValueColumn("title", "TEXT"),
                    ValueColumn("rating", "TEXT"),
                ],
                fn=compose_one(parse, pop_id_fields("id")),
            ),
        }
select : str = None

The SELECT statement that generates input rows passed to fn as arguments

table : ralsei.graph.Resolves[ralsei.types.Table] = None

Table to add columns to

May be the output of another task

columns : Sequence[ralsei.types.ValueColumnBase] = None

List of new columns

Used for ADD COLUMN and UPDATE statement generation.

fn : ralsei.wrappers.OneToOne = None

Function that maps one row to values of the new columns in the same row

If id_fields argument is omitted, will try to infer the id_fields from metadata left by ralsei.wrappers.pop_id_fields()

context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'

Task-scoped context-manager arguments passed to fn

Example

from ralsei.contextmanagers import reusable_contextmanager_const
from selenium import webdriver

@reusable_contextmanager_const
def browser_context():
    browser = webdriver.Chrome()
    yield browser
    browser.quit()

def scrape_page(browser: webdriver.Chrome):
    ...

MapToNewColumns(
    fn=scrape_page,
    context={"browser": browser_context}
)
is_done_column : Optional[str] = None

Create a boolean column with the given name in table that tracks which rows have been processed

If set, the task will commit after each successful run of fn, allowing you to stop and resume from the same place.

Note

Make sure to include WHERE NOT {{is_done}} in your select statement

id_fields : Optional[list[ralsei.types.IdColumn]] = None

Columns that uniquely identify a row in table, so that you can update is_done_column

This argument takes precedence over id_fields inferred from fn’s metadata

Custom Task

To start with, all tasks consist of a TaskDef part - a configuration object that stores the task arguments - and a TaskImpl part that gets instantiated later.

from ralsei.task import TaskDef, TaskImpl

class LoadCsv(TaskDef):
    # Put task parameters here

    class Impl(TaskImpl):
        """Task implementation"""

dataclasses.dataclass() decorator is implicitly applied to any TaskDef descendant, so you can just declare the parameters as class attributes:

from pathlib import Path
from ralsei.types import Table

class LoadCsv(TaskDef):
    table: Table # Target table
    path: Path

Then, initialize the Impl by implementing TaskImpl.prepare()

If you’re going to resolve dependencies with TaskImpl.resolve() or render templates with TaskImpl.env (unless they are dynamically generated), you have to do it during this stage.

Additionally, you can save your rendered SQL statements into TaskImpl._scripts, so that they can be viewed in the CLI.

class LoadCsv(TaskDef):
    ...
    class Impl(TaskImpl):
        def prepare(self, this: "LoadCsv"):
            self.__table = this.table
            self.__path = this.path

            self._scripts["Drop Table"] = self.__drop_sql = self.env.render_sql(
                "DROP TABLE IF EXISTS {{table}}",
                table=this.table
            )

Finally, implement TaskImpl ‘s abstract methods:

Here we are using pandas for dynamic table generation

from typing import Any
import pandas as pd
from ralsei.connection import ConnectionEnvironment
from ralsei import db_actions

class LoadCsv(TaskDef):
    class Impl(TaskImpl):
        ...
        def _run(self, conn: ConnectionEnvironment):
            pd.read_csv(self.__path).to_sql(
                self.__table.name,
                conn.sqlalchemy,
                schema=self.__table.schema
            )

        def _exists(self, conn: ConnectionEnvironment) -> bool:
            return db_actions.table_exists(conn, self.__table)

        @property
        def output(self) -> Any:
            return self.__table

        def _delete(self, conn: ConnectionEnvironment):
            conn.sqlalchemy.execute(self.__drop_sql)

In fact, since everything except _run() is identical for table-creating tasks, you can use CreateTableTask as a base class, reducing boilerplate. Just don’t forget to call CreateTableTask._prepare_table() from within prepare()

import pandas as pd
from pathlib import Path
from ralsei.types import Table
from ralsei.task import TaskDef, CreateTableTask
from ralsei.connection import ConnectionEnvironment

class LoadCsv(TaskDef):
    table: Table
    path: Path

    class Impl(CreateTableTask):
        def prepare(self, this: "UploadCsv"):
            self._prepare_table(this.table)
            self.__path = this.path

        def _run(self, conn: ConnectionEnvironment):
            pd.read_csv(self.__path).to_sql(
                self._table.name,
                conn.sqlalchemy,
                schema=self._table.schema
            )

For tasks that add columns to an existing table, there’s an eqiuvalent AddColumnsTask