ralsei.task.map_to_new_columns
¶
Module Contents¶
Classes¶
Applies the provided map function to a query result, saving outputs into new columns on the same row |
API¶
- class ralsei.task.map_to_new_columns.MapToNewColumns¶
Bases:
ralsei.task.base.TaskDef
Applies the provided map function to a query result, saving outputs into new columns on the same row
Variables passed to jinja:
table=
table
is_done=
is_done_column
(asralsei.types.Identifier
)
Example
import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None
¶ Table to add columns to
May be the output of another task
-
columns : Sequence[ralsei.types.ValueColumnBase] =
None
¶ List of new columns
Used for
ADD COLUMN
andUPDATE
statement generation.
-
fn : ralsei.wrappers.OneToOne =
None
¶ Function that maps one row to values of the new columns in the same row
If
id_fields
argument is omitted, will try to infer theid_fields
from metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'
¶ Task-scoped context-manager arguments passed to
fn
Example
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
-
is_done_column : Optional[str] =
None
¶ Create a boolean column with the given name in
table
that tracks which rows have been processedIf set, the task will commit after each successful run of
fn
, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}
in yourselect
statement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None
¶ Columns that uniquely identify a row in
table
, so that you can updateis_done_column
This argument takes precedence over
id_fields
inferred fromfn
’s metadata