ralsei.task.map_to_new_columns¶
Module Contents¶
Classes¶
| Applies the provided map function to a query result, saving outputs into new columns on the same row | 
API¶
- class ralsei.task.map_to_new_columns.MapToNewColumns¶
- Bases: - ralsei.task.base.TaskDef- Applies the provided map function to a query result, saving outputs into new columns on the same row - Variables passed to jinja: - table= - table
- is_done= - is_done_column(as- ralsei.types.Identifier)
 - Example - import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }- 
table : ralsei.graph.Resolves[ralsei.types.Table] = None¶
- Table to add columns to - May be the output of another task 
 - 
columns : Sequence[ralsei.types.ValueColumnBase] = None¶
- List of new columns - Used for - ADD COLUMNand- UPDATEstatement generation.
 - 
fn : ralsei.wrappers.OneToOne = None¶
- Function that maps one row to values of the new columns in the same row - If - id_fieldsargument is omitted, will try to infer the- id_fieldsfrom metadata left by- ralsei.wrappers.pop_id_fields()
 - 
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'¶
- Task-scoped context-manager arguments passed to - fn- Example - from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
 - 
is_done_column : Optional[str] = None¶
- Create a boolean column with the given name in - tablethat tracks which rows have been processed- If set, the task will commit after each successful run of - fn, allowing you to stop and resume from the same place.- Note - Make sure to include - WHERE NOT {{is_done}}in your- selectstatement
 - 
id_fields : Optional[list[ralsei.types.IdColumn]] = None¶
- Columns that uniquely identify a row in - table, so that you can update- is_done_column- This argument takes precedence over - id_fieldsinferred from- fn’s metadata