ralsei.task.map_to_new_table
¶
Module Contents¶
Classes¶
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table |
API¶
- class ralsei.task.map_to_new_table.MapToNewTable¶
Bases:
ralsei.task.base.TaskDef
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table
Variables passed to jinja:
table=
table
source=
source_table
is_done=
is_done_column
(asralsei.types.Identifier
)
Example
from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }
Table body is generated from all
columns
,INSERT
statement - only fromralsei.types.ValueColumnBase
-
table : ralsei.types.Table =
None
¶ The new table being created
-
columns : Sequence[str | ralsei.types.ValueColumnBase] =
None
¶ Columns (and constraints) that make up the table definition
Additionally,
ralsei.types.ValueColumnBase.value
field is used forINSERT
statement generationstr
columns andralsei.types.ValueColumn
’s type are passed through the jinja renderer
-
fn : ralsei.wrappers.OneToMany =
None
¶ A generator function, mapping one row to many rows
If
id_fields
argument is omitted, will try to infer theid_fields
from metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'
¶ Task-scoped context-manager arguments passed to
fn
Example
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
-
select : Optional[str] =
None
¶ The
SELECT
statement that generates rows passed tofn
as argumentsIf not specified,
fn
will only run once with 0 arguments.
-
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] =
None
¶ The table where the input rows come from
If not creating
is_done_column
, you can leave it asNone
.
May be the output of another task.
-
is_done_column : Optional[str] =
None
¶ Create a boolean column with the given name in
source_table
that tracks which rows have been processedIf set, the task will commit after each successful run of
fn
, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}
in yourselect
statement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None
¶ Columns that uniquely identify a row in
source_table
, so that you can updateis_done_column
This argument takes precedence over
id_fields
inferred fromfn
’s metadata