ralsei.task.map_to_new_table¶
Module Contents¶
Classes¶
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table |
API¶
- class ralsei.task.map_to_new_table.MapToNewTable¶
Bases:
ralsei.task.base.TaskDefApplies the provided map function to a query result, mapping a single row to one or more rows in a new table
Variables passed to jinja:
table=
tablesource=
source_tableis_done=
is_done_column(asralsei.types.Identifier)
Example
from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }Table body is generated from all
columns,INSERTstatement - only fromralsei.types.ValueColumnBase
-
table : ralsei.types.Table =
None¶ The new table being created
-
columns : Sequence[str | ralsei.types.ValueColumnBase] =
None¶ Columns (and constraints) that make up the table definition
Additionally,
ralsei.types.ValueColumnBase.valuefield is used forINSERTstatement generationstrcolumns andralsei.types.ValueColumn’s type are passed through the jinja renderer
-
fn : ralsei.wrappers.OneToMany =
None¶ A generator function, mapping one row to many rows
If
id_fieldsargument is omitted, will try to infer theid_fieldsfrom metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'¶ Task-scoped context-manager arguments passed to
fnExample
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
-
select : Optional[str] =
None¶ The
SELECTstatement that generates rows passed tofnas argumentsIf not specified,
fnwill only run once with 0 arguments.
-
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] =
None¶ The table where the input rows come from
If not creating
is_done_column, you can leave it asNone.
May be the output of another task.
-
is_done_column : Optional[str] =
None¶ Create a boolean column with the given name in
source_tablethat tracks which rows have been processedIf set, the task will commit after each successful run of
fn, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}in yourselectstatement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None¶ Columns that uniquely identify a row in
source_table, so that you can updateis_done_columnThis argument takes precedence over
id_fieldsinferred fromfn’s metadata