ralsei.task.map_to_new_table

Module Contents

Classes

MapToNewTable

Applies the provided map function to a query result, mapping a single row to one or more rows in a new table

API

class ralsei.task.map_to_new_table.MapToNewTable

Bases: ralsei.task.base.TaskDef

Applies the provided map function to a query result, mapping a single row to one or more rows in a new table

Variables passed to jinja:

Example

from parsel import Selector
from ralsei import (
    Pipeline,
    MapToNewTable,
    Table,
    ValueColumn,
    Placeholder,
    compose,
    add_to_input,
    pop_id_fields,
)

# Find subjects on the hub page
def find_subjects(hub_url: str):
    html = download(hub_url)
    sel = Selector(html)

    for row in sel.xpath("//table/tr"):
        yield {
            "subject": row.xpath("a/text()").get(),
            "url": row.xpath("a/@href").get()
        }

# Download all pages in a subject rating
def download_pages(url: str):
    next_url = url
    page = 1

    while next_url is not None:
        html = download(next_url)
        yield { "page": page, "html": html }

        sel = Selector(html)
        next_url = sel.xpath("//a[@id = 'next']").get()
        page += 1


class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "subjects": MapToNewTable(
                table=Table("subjects"),
                columns=[ # (1)
                    "id SERIAL PRIMARY KEY",
                    ValueColumn("subject", "TEXT"),
                    ValueColumn("url", "TEXT"),
                ],
                fn=compose(
                    find_subjects,
                    add_to_input(hub_url="https://rating.com/2022")
                )
            ),
            "pages": MapToNewTable(
                source_table=self.outputof("subjects"),
                select="""\
                SELECT id, url FROM {{source}}
                WHERE NOT {{is_done}}""",
                table=Table("pages"),
                columns=[
                    ValueColumn(
                        "subject_id",
                        "INT REFERENCES {{source}}(id)",
                        Placeholder("id")
                    ),
                    ValueColumn("page", "INT"),
                    ValueColumn("html", "TEXT"),
                    "date_downloaded DATE DEFAULT NOW()",
                ],
                is_done_column="__downloaded",
                fn=compose(download_pages, pop_id_fields("id"))
            )
        }
  1. Table body is generated from all columns, INSERT statement - only from ralsei.types.ValueColumnBase

table : ralsei.types.Table = None

The new table being created

columns : Sequence[str | ralsei.types.ValueColumnBase] = None

Columns (and constraints) that make up the table definition

Additionally, ralsei.types.ValueColumnBase.value field is used for INSERT statement generation

str columns and ralsei.types.ValueColumn’s type are passed through the jinja renderer

fn : ralsei.wrappers.OneToMany = None

A generator function, mapping one row to many rows

If id_fields argument is omitted, will try to infer the id_fields from metadata left by ralsei.wrappers.pop_id_fields()

context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'

Task-scoped context-manager arguments passed to fn

Example

from ralsei.contextmanagers import reusable_contextmanager_const
from selenium import webdriver

@reusable_contextmanager_const
def browser_context():
    browser = webdriver.Chrome()
    yield browser
    browser.quit()

def scrape_page(browser: webdriver.Chrome):
    ...

MapToNewTable(
    fn=scrape_page,
    context={"browser": browser_context}
)
select : Optional[str] = None

The SELECT statement that generates rows passed to fn as arguments

If not specified, fn will only run once with 0 arguments.

source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] = None

The table where the input rows come from

If not creating is_done_column, you can leave it as None.
May be the output of another task.

is_done_column : Optional[str] = None

Create a boolean column with the given name in source_table that tracks which rows have been processed

If set, the task will commit after each successful run of fn, allowing you to stop and resume from the same place.

Note

Make sure to include WHERE NOT {{is_done}} in your select statement

id_fields : Optional[list[ralsei.types.IdColumn]] = None

Columns that uniquely identify a row in source_table, so that you can update is_done_column

This argument takes precedence over id_fields inferred from fn’s metadata