ralsei.task.map_to_new_columns

Module Contents

Classes

MapToNewColumns

Applies the provided map function to a query result, saving outputs into new columns on the same row

API

class ralsei.task.map_to_new_columns.MapToNewColumns

Bases: ralsei.task.base.TaskDef

Applies the provided map function to a query result, saving outputs into new columns on the same row

Variables passed to jinja:

Example

import requests
from parsel import Selector
from ralsei import (
    Pipeline,
    MapToNewColumns,
    Table,
    ValueColumn,
    Sql,
    compose_one,
    pop_id_fields,
)

def download(url: str):
    response = requests.get(url)
    response.raise_for_status()
    return {"html": response.text}

def parse(html: str):
    sel = Selector(html)
    return {
        "title": sel.xpath("//h1/text()").get(),
        "rating": sel.xpath("//div[@id='rating']/text()").get(),
    }


class MyPipeline(Pipeline):
    def create_tasks(self):
        return {
            "download": MapToNewColumns(
                table=Table("pages"),
                select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}",
                columns=[
                    ValueColumn("html", "TEXT"),
                    ValueColumn("date_downloaded", "DATE", Sql("NOW()")),
                ],
                is_done_column="__downloaded",
                fn=compose_one(download, pop_id_fields("id")),
            ),
            "parse": MapToNewColumns(
                table=self.outputof("download"),
                select="SELECT id, html FROM {{table}}",
                columns=[
                    ValueColumn("title", "TEXT"),
                    ValueColumn("rating", "TEXT"),
                ],
                fn=compose_one(parse, pop_id_fields("id")),
            ),
        }
select : str = None

The SELECT statement that generates input rows passed to fn as arguments

table : ralsei.graph.Resolves[ralsei.types.Table] = None

Table to add columns to

May be the output of another task

columns : Sequence[ralsei.types.ValueColumnBase] = None

List of new columns

Used for ADD COLUMN and UPDATE statement generation.

fn : ralsei.wrappers.OneToOne = None

Function that maps one row to values of the new columns in the same row

If id_fields argument is omitted, will try to infer the id_fields from metadata left by ralsei.wrappers.pop_id_fields()

context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'

Task-scoped context-manager arguments passed to fn

Example

from ralsei.contextmanagers import reusable_contextmanager_const
from selenium import webdriver

@reusable_contextmanager_const
def browser_context():
    browser = webdriver.Chrome()
    yield browser
    browser.quit()

def scrape_page(browser: webdriver.Chrome):
    ...

MapToNewColumns(
    fn=scrape_page,
    context={"browser": browser_context}
)
is_done_column : Optional[str] = None

Create a boolean column with the given name in table that tracks which rows have been processed

If set, the task will commit after each successful run of fn, allowing you to stop and resume from the same place.

Note

Make sure to include WHERE NOT {{is_done}} in your select statement

id_fields : Optional[list[ralsei.types.IdColumn]] = None

Columns that uniquely identify a row in table, so that you can update is_done_column

This argument takes precedence over id_fields inferred from fn’s metadata