ralsei.task¶
Package Contents¶
Classes¶
Base task class |
|
Task implementation created from |
|
Stores task aguments before said task is created |
|
Runs a |
|
Adds the specified Columns to an existing Table and runs the SQL script to fill them with data |
|
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table |
|
Applies the provided map function to a query result, saving outputs into new columns on the same row |
|
Base class for a task that performs table creation |
|
Base class for a task that adds columns to a table |
Data¶
Attribute that gets added to exceptions that occured in a row processing context. |
|
ContextVar storing the popped fields of the currently processed row
(used by |
API¶
- class ralsei.task.Task¶
Bases:
abc.ABCBase task class
- abstract run(conn: ralsei.connection.ConnectionExt)¶
Run the task
- abstract delete(conn: ralsei.connection.ConnectionExt)¶
Delete whatever
run()has created
- redo(conn: ralsei.connection.ConnectionExt)¶
- abstract property output : Any¶
Object created or modified by this task (usually a
ralsei.types.Table)Used for resolving
ralsei.graph.Pipeline.outputof()
- abstract exists(conn: ralsei.connection.ConnectionExt) bool¶
Check if task has already been done
- scripts() collections.abc.Iterable[tuple[str, object]]¶
Get SQL scripts rendered by this task
- Returns:¶
iterable of
("name", script), where script is either:a string-like object, usually
strorsqlalchemy.sql.elements.TextClausea list of string-like objects (in case of multiple statements)
- class ralsei.task.TaskImpl(this: D, env: ralsei.jinja.ISqlEnvironment)¶
Bases:
ralsei.task.base.TaskTask implementation created from
TaskDefarguments- Parameters:¶
- this : TaskDef¶
the settings object for this task
- env: ralsei.jinja.ISqlEnvironment¶
jinja environment
Warning
It is advised againts overriding
__init__. Perform your initialization inprepare()instead.Initialization
-
env : ralsei.jinja.ISqlEnvironment =
None¶
-
_scripts : dict[str, object] =
None¶ You can save your sql scripts here when you render them, the key-value pairs will be returned by
scripts()Example
class Impl(TaskImpl) def prepare(self, this: "MyTaskDef") self._scripts["Create table"] = self.__create = self.env.render(this.sql)
- resolve(value: ralsei.graph.Resolves[ralsei.task.base.TaskImpl.resolve.T]) ralsei.task.base.TaskImpl.resolve.T¶
Resolve a dependency
- Parameters:¶
- value : ralsei.graph.OutputOf | T¶
may or may not need dependency resolution
- Returns:¶
the resolved value
- Return type:¶
T
- run(conn: ralsei.connection.ConnectionExt)¶
- delete(conn: ralsei.connection.ConnectionExt)¶
- exists(conn: ralsei.connection.ConnectionExt) bool¶
- abstract _run(conn: ralsei.connection.ConnectionEnvironment)¶
Run the task
- abstract _delete(conn: ralsei.connection.ConnectionEnvironment)¶
Delete whatever
_run()has created
- abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool¶
Check if task has already been done
- scripts() collections.abc.Iterable[tuple[str, object]]¶
Get SQL scripts rendered by this task
- class ralsei.task.TaskDef¶
Stores task aguments before said task is created
Any subclass of
TaskDefautomatically getsdataclasses.dataclass()decorator applied to it-
Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] =
None¶ The associated task class
Note
This field is not part of the dataclass
Example
class MyTask(TaskDef): class Impl(TaskImpl): def prepare(self, this: "MyTask"): ...
- create(env: ralsei.jinja.SqlEnvironment) ralsei.task.base.TaskImpl[Self]¶
Instantiate the associated
Impl
-
Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] =
- class ralsei.task.CreateTableSql¶
Bases:
ralsei.task.base.TaskDefRuns a
CREATE TABLEsql scriptVariables passed to the template:
table,viewExample
unnest.sql
CREATE TABLE {{table}}( id SERIAL PRIMARY KEY, name TEXT ); {%-split-%} INSERT INTO {{table}}(name) SELECT json_array_elements_text(json->'names') FROM {{sources}};pipeline.py
"unnest": CreateTableSql( sql=Path("./unnest.sql").read_text(), table=Table("new_table"), locals={"sources": self.outputof("other")}, )Note
You can use
ralsei.utils.folder()to find SQL files relative to current file-
sql : str | list[str] =
None¶ Sql template strings
Individual statements must be either separated by
{%split%}tag or pre-split into a list
-
table : ralsei.types.Table =
None¶ Table being created
-
sql : str | list[str] =
- class ralsei.task.AddColumnsSql¶
Bases:
ralsei.task.base.TaskDefAdds the specified Columns to an existing Table and runs the SQL script to fill them with data
Variables passed to the template:
table
Columns can be defined in the template itself, using{% set columns = [...] %}Example
postprocess.sql
{% set columns = [Column("name_upper", "TEXT")] -%} UPDATE {{table}} SET name_upper = UPPER(name);pipeline.py
"postprocess": AddColumnsSql( sql=Path("./postprocess.sql").read_text(), table=Table("people"), )Note
You can use
ralsei.utils.folder()to find SQL files relative to current file-
sql : str | list[str] =
None¶ Sql template strings
Individual statements must be either separated by
{%split%}tag or pre-split into a list
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None¶ Table to add columns to
May be the output of another task
-
columns : Optional[Sequence[ralsei.types.ColumnBase]] =
None¶ these column definitions take precedence over those defined in the template
-
sql : str | list[str] =
- class ralsei.task.MapToNewTable¶
Bases:
ralsei.task.base.TaskDefApplies the provided map function to a query result, mapping a single row to one or more rows in a new table
Variables passed to jinja:
table=
tablesource=
source_tableis_done=
is_done_column(asralsei.types.Identifier)
Example
from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }Table body is generated from all
columns,INSERTstatement - only fromralsei.types.ValueColumnBase
-
table : ralsei.types.Table =
None¶ The new table being created
-
columns : Sequence[str | ralsei.types.ValueColumnBase] =
None¶ Columns (and constraints) that make up the table definition
Additionally,
ralsei.types.ValueColumnBase.valuefield is used forINSERTstatement generationstrcolumns andralsei.types.ValueColumn’s type are passed through the jinja renderer
-
fn : ralsei.wrappers.OneToMany =
None¶ A generator function, mapping one row to many rows
If
id_fieldsargument is omitted, will try to infer theid_fieldsfrom metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'¶ Task-scoped context-manager arguments passed to
fnExample
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
-
select : Optional[str] =
None¶ The
SELECTstatement that generates rows passed tofnas argumentsIf not specified,
fnwill only run once with 0 arguments.
-
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] =
None¶ The table where the input rows come from
If not creating
is_done_column, you can leave it asNone.
May be the output of another task.
-
is_done_column : Optional[str] =
None¶ Create a boolean column with the given name in
source_tablethat tracks which rows have been processedIf set, the task will commit after each successful run of
fn, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}in yourselectstatement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None¶ Columns that uniquely identify a row in
source_table, so that you can updateis_done_columnThis argument takes precedence over
id_fieldsinferred fromfn’s metadata
- class ralsei.task.MapToNewColumns¶
Bases:
ralsei.task.base.TaskDefApplies the provided map function to a query result, saving outputs into new columns on the same row
Variables passed to jinja:
table=
tableis_done=
is_done_column(asralsei.types.Identifier)
Example
import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None¶ Table to add columns to
May be the output of another task
-
columns : Sequence[ralsei.types.ValueColumnBase] =
None¶ List of new columns
Used for
ADD COLUMNandUPDATEstatement generation.
-
fn : ralsei.wrappers.OneToOne =
None¶ Function that maps one row to values of the new columns in the same row
If
id_fieldsargument is omitted, will try to infer theid_fieldsfrom metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'¶ Task-scoped context-manager arguments passed to
fnExample
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
-
is_done_column : Optional[str] =
None¶ Create a boolean column with the given name in
tablethat tracks which rows have been processedIf set, the task will commit after each successful run of
fn, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}in yourselectstatement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None¶ Columns that uniquely identify a row in
table, so that you can updateis_done_columnThis argument takes precedence over
id_fieldsinferred fromfn’s metadata
- class ralsei.task.CreateTableTask¶
Bases:
ralsei.task.base.TaskImplBase class for a task that performs table creation
All you have to do is call
_prepare_table()from withinralsei.task.TaskImpl.prepare().output,_exists()and_delete()are implemented for you, leaving only theralsei.task.TaskImpl._run()partExample
import pandas as pd class UploadCsv(TaskDef): table: Table path: Path class Impl(CreateTableTask): def prepare(self, this: "UploadCsv"): self._prepare_table(this.table) self.__path = this.path def _run(self, conn: ConnectionEnvironment): with self.__path.open() as file: pd.read_csv(file).to_sql( self._table.name, conn.sqlalchemy, schema=self._table.schema )-
_table : ralsei.types.Table =
None¶
-
_drop_sql : sqlalchemy.sql.elements.TextClause =
None¶
-
_prepare_table(table: ralsei.types.Table, view: bool =
False)¶
- _exists(conn: ralsei.connection.ConnectionEnvironment) bool¶
- _delete(conn: ralsei.connection.ConnectionEnvironment)¶
-
_table : ralsei.types.Table =
- class ralsei.task.AddColumnsTask¶
Bases:
ralsei.task.base.TaskImplBase class for a task that adds columns to a table
All you have to do is call
_prepare_columns()from withinralsei.task.TaskImpl.prepare().output,_exists()and_delete()are implemented for you, leaving only theralsei.task.TaskImpl._run()part-
_table : ralsei.types.Table =
None¶
-
_columns : list[ralsei.types.ColumnRendered] =
None¶
-
_add_columns : ralsei.db_actions.AddColumns =
None¶
-
_drop_columns : ralsei.db_actions.DropColumns =
None¶
-
_prepare_columns(table: ralsei.graph.Resolves[ralsei.types.Table], columns: collections.abc.Sequence[ralsei.types.ColumnBase], *, if_not_exists: bool =
False)¶
- _exists(conn: ralsei.connection.ConnectionEnvironment) bool¶
- _delete(conn: ralsei.connection.ConnectionEnvironment)¶
-
_table : ralsei.types.Table =
-
ralsei.task.ROW_CONTEXT_ATRRIBUTE =
'__ralsei_row_context'¶ Attribute that gets added to exceptions that occured in a row processing context.
Stores the popped fields of the aforementioned row
-
ralsei.task.ROW_CONTEXT_VAR : contextvars.ContextVar[dict] =
'ContextVar(...)'¶ ContextVar storing the popped fields of the currently processed row (used by
ralsei.console.consolefor logging purposes)