ralsei.task.map_to_new_columns¶
Module Contents¶
Classes¶
Applies the provided map function to a query result, saving outputs into new columns on the same row |
API¶
- class ralsei.task.map_to_new_columns.MapToNewColumns¶
Bases:
ralsei.task.base.TaskDefApplies the provided map function to a query result, saving outputs into new columns on the same row
Variables passed to jinja:
table=
tableis_done=
is_done_column(asralsei.types.Identifier)
Example
import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None¶ Table to add columns to
May be the output of another task
-
columns : Sequence[ralsei.types.ValueColumnBase] =
None¶ List of new columns
Used for
ADD COLUMNandUPDATEstatement generation.
-
fn : ralsei.wrappers.OneToOne =
None¶ Function that maps one row to values of the new columns in the same row
If
id_fieldsargument is omitted, will try to infer theid_fieldsfrom metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'¶ Task-scoped context-manager arguments passed to
fnExample
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
-
is_done_column : Optional[str] =
None¶ Create a boolean column with the given name in
tablethat tracks which rows have been processedIf set, the task will commit after each successful run of
fn, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}in yourselectstatement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None¶ Columns that uniquely identify a row in
table, so that you can updateis_done_columnThis argument takes precedence over
id_fieldsinferred fromfn’s metadata