ralsei.task

Package Contents

Classes

Task

Base task class

TaskImpl

Task implementation created from TaskDef arguments

TaskDef

Stores task aguments before said task is created

CreateTableSql

Runs a CREATE TABLE sql script

AddColumnsSql

Adds the specified Columns to an existing Table and runs the SQL script to fill them with data

MapToNewTable

Applies the provided map function to a query result, mapping a single row to one or more rows in a new table

MapToNewColumns

Applies the provided map function to a query result, saving outputs into new columns on the same row

CreateTableTask

Base class for a task that performs table creation

AddColumnsTask

Base class for a task that adds columns to a table

Data

ROW_CONTEXT_ATRRIBUTE

Attribute that gets added to exceptions that occured in a row processing context.
Stores the popped fields of the aforementioned row

ROW_CONTEXT_VAR

ContextVar storing the popped fields of the currently processed row (used by ralsei.console.console for logging purposes)

API

class ralsei.task.Task

Bases: abc.ABC

Base task class

abstract run(conn: ralsei.connection.ConnectionExt)

Run the task

abstract delete(conn: ralsei.connection.ConnectionExt)

Delete whatever run() has created

redo(conn: ralsei.connection.ConnectionExt)

Calls delete() and then run()

abstract property output : Any

Object created or modified by this task (usually a ralsei.types.Table)

Used for resolving ralsei.graph.Pipeline.outputof()

abstract exists(conn: ralsei.connection.ConnectionExt) bool

Check if task has already been done

scripts() collections.abc.Iterable[tuple[str, object]]

Get SQL scripts rendered by this task

Returns:

iterable of ("name", script), where script is either:

  1. a string-like object, usually str or sqlalchemy.sql.elements.TextClause

  2. a list of string-like objects (in case of multiple statements)

class ralsei.task.TaskImpl(this: D, env: ralsei.jinja.ISqlEnvironment)

Bases: ralsei.task.base.Task

Task implementation created from TaskDef arguments

Parameters:
this : TaskDef

the settings object for this task

env: ralsei.jinja.ISqlEnvironment

jinja environment

Warning

It is advised againts overriding __init__. Perform your initialization in prepare() instead.

Initialization

env : ralsei.jinja.ISqlEnvironment = None
_scripts : dict[str, object] = None

You can save your sql scripts here when you render them, the key-value pairs will be returned by scripts()

Example

class Impl(TaskImpl)
    def prepare(self, this: "MyTaskDef")
        self._scripts["Create table"] = self.__create = self.env.render(this.sql)
prepare(this: D)

Perform your initialization here

Parameters:
this : TaskDef

the settings object for this task

resolve(value: ralsei.graph.Resolves[ralsei.task.base.TaskImpl.resolve.T]) ralsei.task.base.TaskImpl.resolve.T

Resolve a dependency

Parameters:
value : ralsei.graph.OutputOf | T

may or may not need dependency resolution

Returns:

the resolved value

Return type:

T

run(conn: ralsei.connection.ConnectionExt)
delete(conn: ralsei.connection.ConnectionExt)
exists(conn: ralsei.connection.ConnectionExt) bool
abstract _run(conn: ralsei.connection.ConnectionEnvironment)

Run the task

abstract _delete(conn: ralsei.connection.ConnectionEnvironment)

Delete whatever _run() has created

abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool

Check if task has already been done

scripts() collections.abc.Iterable[tuple[str, object]]

Get SQL scripts rendered by this task

class ralsei.task.TaskDef

Stores task aguments before said task is created

Any subclass of TaskDef automatically gets dataclasses.dataclass() decorator applied to it

Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] = None

The associated task class

Note

This field is not part of the dataclass

Example

class MyTask(TaskDef):
    class Impl(TaskImpl):
        def prepare(self, this: "MyTask"):
            ...
locals : dict[str, Any] = 'field(...)'

Local variables added to the jinja environment

create(env: ralsei.jinja.SqlEnvironment) ralsei.task.base.TaskImpl[Self]

Instantiate the associated Impl

class ralsei.task.CreateTableSql

Bases: ralsei.task.base.TaskDef

Runs a CREATE TABLE sql script

Variables passed to the template: table, view

Example

unnest.sql

CREATE TABLE {{table}}(
    id SERIAL PRIMARY KEY,
    name TEXT
);
{%-split-%}
INSERT INTO {{table}}(name)
SELECT json_array_elements_text(json->'names')
FROM {{sources}};

pipeline.py

"unnest": CreateTableSql(
    sql=Path("./unnest.sql").read_text(),
    table=Table("new_table"),
    locals={"sources": self.outputof("other")},
)

Note

You can use ralsei.utils.folder() to find SQL files relative to current file

sql : str | list[str] = None

Sql template strings

Individual statements must be either separated by {%split%} tag or pre-split into a list

table : ralsei.types.Table = None

Table being created

view : bool = False

whether this is a VIEW instead of a TABLE

class ralsei.task.AddColumnsSql

Bases: ralsei.task.base.TaskDef

Adds the specified Columns to an existing Table and runs the SQL script to fill them with data

Variables passed to the template: table
Columns can be defined in the template itself, using {% set columns = [...] %}

Example

postprocess.sql

{% set columns = [Column("name_upper", "TEXT")] -%}

UPDATE {{table}}
SET name_upper = UPPER(name);

pipeline.py

"postprocess": AddColumnsSql(
    sql=Path("./postprocess.sql").read_text(),
    table=Table("people"),
)

Note

You can use ralsei.utils.folder() to find SQL files relative to current file

sql : str | list[str] = None

Sql template strings

Individual statements must be either separated by {%split%} tag or pre-split into a list

table : ralsei.graph.Resolves[ralsei.types.Table] = None

Table to add columns to

May be the output of another task

columns : Optional[Sequence[ralsei.types.ColumnBase]] = None

these column definitions take precedence over those defined in the template

class ralsei.task.MapToNewTable

Bases: ralsei.task.base.TaskDef

Applies the provided map function to a query result, mapping a single row to one or more rows in a new table

Variables passed to jinja:

Example

from parsel import Selector
from ralsei import (
    Pipeline,
    MapToNewTable,
    Table,
    ValueColumn,
    Placeholder,
    compose,
    add_to_input,
    pop_id_fields,
)

# Find subjects on the hub page
def find_subjects(hub_url: str):
    html = download(hub_url)
    sel = Selector(html)

    for row in sel.xpath("//table/tr"):
        yield {
            "subject": row.xpath("a/text()").get(),
            "url": row.xpath("a/@href").get()
        }

# Download all pages in a subject rating
def download_pages(url: str):
    next_url = url
    page = 1

    while next_url is not None:
        html = download(next_url)
        yield { "page": page, "html": html }

        sel = Selector(html)
        next_url = sel.xpath("//a[@id = 'next']").get()
        page += 1


class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "subjects": MapToNewTable(
                table=Table("subjects"),
                columns=[ # (1)
                    "id SERIAL PRIMARY KEY",
                    ValueColumn("subject", "TEXT"),
                    ValueColumn("url", "TEXT"),
                ],
                fn=compose(
                    find_subjects,
                    add_to_input(hub_url="https://rating.com/2022")
                )
            ),
            "pages": MapToNewTable(
                source_table=self.outputof("subjects"),
                select="""\
                SELECT id, url FROM {{source}}
                WHERE NOT {{is_done}}""",
                table=Table("pages"),
                columns=[
                    ValueColumn(
                        "subject_id",
                        "INT REFERENCES {{source}}(id)",
                        Placeholder("id")
                    ),
                    ValueColumn("page", "INT"),
                    ValueColumn("html", "TEXT"),
                    "date_downloaded DATE DEFAULT NOW()",
                ],
                is_done_column="__downloaded",
                fn=compose(download_pages, pop_id_fields("id"))
            )
        }
  1. Table body is generated from all columns, INSERT statement - only from ralsei.types.ValueColumnBase

table : ralsei.types.Table = None

The new table being created

columns : Sequence[str | ralsei.types.ValueColumnBase] = None

Columns (and constraints) that make up the table definition

Additionally, ralsei.types.ValueColumnBase.value field is used for INSERT statement generation

str columns and ralsei.types.ValueColumn’s type are passed through the jinja renderer

fn : ralsei.wrappers.OneToMany = None

A generator function, mapping one row to many rows

If id_fields argument is omitted, will try to infer the id_fields from metadata left by ralsei.wrappers.pop_id_fields()

context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'

Task-scoped context-manager arguments passed to fn

Example

from ralsei.contextmanagers import reusable_contextmanager_const
from selenium import webdriver

@reusable_contextmanager_const
def browser_context():
    browser = webdriver.Chrome()
    yield browser
    browser.quit()

def scrape_page(browser: webdriver.Chrome):
    ...

MapToNewTable(
    fn=scrape_page,
    context={"browser": browser_context}
)
select : Optional[str] = None

The SELECT statement that generates rows passed to fn as arguments

If not specified, fn will only run once with 0 arguments.

source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] = None

The table where the input rows come from

If not creating is_done_column, you can leave it as None.
May be the output of another task.

is_done_column : Optional[str] = None

Create a boolean column with the given name in source_table that tracks which rows have been processed

If set, the task will commit after each successful run of fn, allowing you to stop and resume from the same place.

Note

Make sure to include WHERE NOT {{is_done}} in your select statement

id_fields : Optional[list[ralsei.types.IdColumn]] = None

Columns that uniquely identify a row in source_table, so that you can update is_done_column

This argument takes precedence over id_fields inferred from fn’s metadata

class ralsei.task.MapToNewColumns

Bases: ralsei.task.base.TaskDef

Applies the provided map function to a query result, saving outputs into new columns on the same row

Variables passed to jinja:

Example

import requests
from parsel import Selector
from ralsei import (
    Pipeline,
    MapToNewColumns,
    Table,
    ValueColumn,
    Sql,
    compose_one,
    pop_id_fields,
)

def download(url: str):
    response = requests.get(url)
    response.raise_for_status()
    return {"html": response.text}

def parse(html: str):
    sel = Selector(html)
    return {
        "title": sel.xpath("//h1/text()").get(),
        "rating": sel.xpath("//div[@id='rating']/text()").get(),
    }


class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "download": MapToNewColumns(
                table=Table("pages"),
                select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}",
                columns=[
                    ValueColumn("html", "TEXT"),
                    ValueColumn("date_downloaded", "DATE", Sql("NOW()")),
                ],
                is_done_column="__downloaded",
                fn=compose_one(download, pop_id_fields("id")),
            ),
            "parse": MapToNewColumns(
                table=self.outputof("download"),
                select="SELECT id, html FROM {{table}}",
                columns=[
                    ValueColumn("title", "TEXT"),
                    ValueColumn("rating", "TEXT"),
                ],
                fn=compose_one(parse, pop_id_fields("id")),
            ),
        }
select : str = None

The SELECT statement that generates input rows passed to fn as arguments

table : ralsei.graph.Resolves[ralsei.types.Table] = None

Table to add columns to

May be the output of another task

columns : Sequence[ralsei.types.ValueColumnBase] = None

List of new columns

Used for ADD COLUMN and UPDATE statement generation.

fn : ralsei.wrappers.OneToOne = None

Function that maps one row to values of the new columns in the same row

If id_fields argument is omitted, will try to infer the id_fields from metadata left by ralsei.wrappers.pop_id_fields()

context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'

Task-scoped context-manager arguments passed to fn

Example

from ralsei.contextmanagers import reusable_contextmanager_const
from selenium import webdriver

@reusable_contextmanager_const
def browser_context():
    browser = webdriver.Chrome()
    yield browser
    browser.quit()

def scrape_page(browser: webdriver.Chrome):
    ...

MapToNewColumns(
    fn=scrape_page,
    context={"browser": browser_context}
)
is_done_column : Optional[str] = None

Create a boolean column with the given name in table that tracks which rows have been processed

If set, the task will commit after each successful run of fn, allowing you to stop and resume from the same place.

Note

Make sure to include WHERE NOT {{is_done}} in your select statement

id_fields : Optional[list[ralsei.types.IdColumn]] = None

Columns that uniquely identify a row in table, so that you can update is_done_column

This argument takes precedence over id_fields inferred from fn’s metadata

class ralsei.task.CreateTableTask

Bases: ralsei.task.base.TaskImpl

Base class for a task that performs table creation

All you have to do is call _prepare_table() from within ralsei.task.TaskImpl.prepare().

output, _exists() and _delete() are implemented for you, leaving only the ralsei.task.TaskImpl._run() part

Example

import pandas as pd

class UploadCsv(TaskDef):
    table: Table
    path: Path

    class Impl(CreateTableTask):
        def prepare(self, this: "UploadCsv"):
            self._prepare_table(this.table)
            self.__path = this.path

        def _run(self, conn: ConnectionEnvironment):
            with self.__path.open() as file:
                pd.read_csv(file).to_sql(
                    self._table.name,
                    conn.sqlalchemy,
                    schema=self._table.schema
                )
_table : ralsei.types.Table = None
_drop_sql : sqlalchemy.sql.elements.TextClause = None
_prepare_table(table: ralsei.types.Table, view: bool = False)
property output : Any
_exists(conn: ralsei.connection.ConnectionEnvironment) bool
_delete(conn: ralsei.connection.ConnectionEnvironment)
class ralsei.task.AddColumnsTask

Bases: ralsei.task.base.TaskImpl

Base class for a task that adds columns to a table

All you have to do is call _prepare_columns() from within ralsei.task.TaskImpl.prepare().

output, _exists() and _delete() are implemented for you, leaving only the ralsei.task.TaskImpl._run() part

_table : ralsei.types.Table = None
_columns : list[ralsei.types.ColumnRendered] = None
_add_columns : ralsei.db_actions.AddColumns = None
_drop_columns : ralsei.db_actions.DropColumns = None
_prepare_columns(table: ralsei.graph.Resolves[ralsei.types.Table], columns: collections.abc.Sequence[ralsei.types.ColumnBase], *, if_not_exists: bool = False)
property output : Any
_exists(conn: ralsei.connection.ConnectionEnvironment) bool
_delete(conn: ralsei.connection.ConnectionEnvironment)
ralsei.task.ROW_CONTEXT_ATRRIBUTE = '__ralsei_row_context'

Attribute that gets added to exceptions that occured in a row processing context.
Stores the popped fields of the aforementioned row

ralsei.task.ROW_CONTEXT_VAR : contextvars.ContextVar[dict] = 'ContextVar(...)'

ContextVar storing the popped fields of the currently processed row (used by ralsei.console.console for logging purposes)