ralsei.task¶
Package Contents¶
Classes¶
| Base task class | |
| Task implementation created from  | |
| Stores task aguments before said task is created | |
| Runs a  | |
| Adds the specified Columns to an existing Table and runs the SQL script to fill them with data | |
| Applies the provided map function to a query result, mapping a single row to one or more rows in a new table | |
| Applies the provided map function to a query result, saving outputs into new columns on the same row | |
| Base class for a task that performs table creation | |
| Base class for a task that adds columns to a table | 
Data¶
| Attribute that gets added to exceptions that occured in a row processing context.  | |
| ContextVar storing the popped fields of the currently processed row
(used by  | 
API¶
- class ralsei.task.Task¶
- Bases: - abc.ABC- Base task class - abstract run(conn: ralsei.connection.ConnectionExt)¶
- Run the task 
 - abstract delete(conn: ralsei.connection.ConnectionExt)¶
- Delete whatever - run()has created
 - redo(conn: ralsei.connection.ConnectionExt)¶
 - abstract property output : Any¶
- Object created or modified by this task (usually a - ralsei.types.Table)- Used for resolving - ralsei.graph.Pipeline.outputof()
 - abstract exists(conn: ralsei.connection.ConnectionExt) bool¶
- Check if task has already been done 
 - scripts() collections.abc.Iterable[tuple[str, object]]¶
- Get SQL scripts rendered by this task - Returns:¶
- iterable of - ("name", script), where script is either:- a string-like object, usually - stror- sqlalchemy.sql.elements.TextClause
- a list of string-like objects (in case of multiple statements) 
 
 
 
- class ralsei.task.TaskImpl(this: D, env: ralsei.jinja.ISqlEnvironment)¶
- Bases: - ralsei.task.base.Task- Task implementation created from - TaskDefarguments- Parameters:¶
- this : TaskDef¶
- the settings object for this task 
- env: ralsei.jinja.ISqlEnvironment¶
- jinja environment 
 
 - Warning - It is advised againts overriding - __init__. Perform your initialization in- prepare()instead.- Initialization - 
env : ralsei.jinja.ISqlEnvironment = None¶
 - 
_scripts : dict[str, object] = None¶
- You can save your sql scripts here when you render them, the key-value pairs will be returned by - scripts()- Example - class Impl(TaskImpl) def prepare(self, this: "MyTaskDef") self._scripts["Create table"] = self.__create = self.env.render(this.sql)
 - resolve(value: ralsei.graph.Resolves[ralsei.task.base.TaskImpl.resolve.T]) ralsei.task.base.TaskImpl.resolve.T¶
- Resolve a dependency - Parameters:¶
- value : ralsei.graph.OutputOf | T¶
- may or may not need dependency resolution 
 
- Returns:¶
- the resolved value 
- Return type:¶
- T 
 
 - run(conn: ralsei.connection.ConnectionExt)¶
 - delete(conn: ralsei.connection.ConnectionExt)¶
 - exists(conn: ralsei.connection.ConnectionExt) bool¶
 - abstract _run(conn: ralsei.connection.ConnectionEnvironment)¶
- Run the task 
 - abstract _delete(conn: ralsei.connection.ConnectionEnvironment)¶
- Delete whatever - _run()has created
 - abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool¶
- Check if task has already been done 
 - scripts() collections.abc.Iterable[tuple[str, object]]¶
- Get SQL scripts rendered by this task 
 
- class ralsei.task.TaskDef¶
- Stores task aguments before said task is created - Any subclass of - TaskDefautomatically gets- dataclasses.dataclass()decorator applied to it- 
Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] = None¶
- The associated task class - Note - This field is not part of the dataclass - Example - class MyTask(TaskDef): class Impl(TaskImpl): def prepare(self, this: "MyTask"): ...
 - create(env: ralsei.jinja.SqlEnvironment) ralsei.task.base.TaskImpl[Self]¶
- Instantiate the associated - Impl
 
- 
Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] = 
- class ralsei.task.CreateTableSql¶
- Bases: - ralsei.task.base.TaskDef- Runs a - CREATE TABLEsql script- Variables passed to the template: - table,- view- Example - unnest.sql - CREATE TABLE {{table}}( id SERIAL PRIMARY KEY, name TEXT ); {%-split-%} INSERT INTO {{table}}(name) SELECT json_array_elements_text(json->'names') FROM {{sources}};- pipeline.py - "unnest": CreateTableSql( sql=Path("./unnest.sql").read_text(), table=Table("new_table"), locals={"sources": self.outputof("other")}, )- Note - You can use - ralsei.utils.folder()to find SQL files relative to current file- 
sql : str | list[str] = None¶
- Sql template strings - Individual statements must be either separated by - {%split%}tag or pre-split into a list
 - 
table : ralsei.types.Table = None¶
- Table being created 
 
- 
sql : str | list[str] = 
- class ralsei.task.AddColumnsSql¶
- Bases: - ralsei.task.base.TaskDef- Adds the specified Columns to an existing Table and runs the SQL script to fill them with data - Variables passed to the template: - table
 Columns can be defined in the template itself, using- {% set columns = [...] %}- Example - postprocess.sql - {% set columns = [Column("name_upper", "TEXT")] -%} UPDATE {{table}} SET name_upper = UPPER(name);- pipeline.py - "postprocess": AddColumnsSql( sql=Path("./postprocess.sql").read_text(), table=Table("people"), )- Note - You can use - ralsei.utils.folder()to find SQL files relative to current file- 
sql : str | list[str] = None¶
- Sql template strings - Individual statements must be either separated by - {%split%}tag or pre-split into a list
 - 
table : ralsei.graph.Resolves[ralsei.types.Table] = None¶
- Table to add columns to - May be the output of another task 
 - 
columns : Optional[Sequence[ralsei.types.ColumnBase]] = None¶
- these column definitions take precedence over those defined in the template 
 
- 
sql : str | list[str] = 
- class ralsei.task.MapToNewTable¶
- Bases: - ralsei.task.base.TaskDef- Applies the provided map function to a query result, mapping a single row to one or more rows in a new table - Variables passed to jinja: - table= - table
- source= - source_table
- is_done= - is_done_column(as- ralsei.types.Identifier)
 - Example - from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }- Table body is generated from all - columns,- INSERTstatement - only from- ralsei.types.ValueColumnBase
 - 
table : ralsei.types.Table = None¶
- The new table being created 
 - 
columns : Sequence[str | ralsei.types.ValueColumnBase] = None¶
- Columns (and constraints) that make up the table definition - Additionally, - ralsei.types.ValueColumnBase.valuefield is used for- INSERTstatement generation- strcolumns and- ralsei.types.ValueColumn’s type are passed through the jinja renderer
 - 
fn : ralsei.wrappers.OneToMany = None¶
- A generator function, mapping one row to many rows - If - id_fieldsargument is omitted, will try to infer the- id_fieldsfrom metadata left by- ralsei.wrappers.pop_id_fields()
 - 
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'¶
- Task-scoped context-manager arguments passed to - fn- Example - from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
 - 
select : Optional[str] = None¶
- The - SELECTstatement that generates rows passed to- fnas arguments- If not specified, - fnwill only run once with 0 arguments.
 - 
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] = None¶
- The table where the input rows come from - If not creating - is_done_column, you can leave it as- None.
 May be the output of another task.
 - 
is_done_column : Optional[str] = None¶
- Create a boolean column with the given name in - source_tablethat tracks which rows have been processed- If set, the task will commit after each successful run of - fn, allowing you to stop and resume from the same place.- Note - Make sure to include - WHERE NOT {{is_done}}in your- selectstatement
 - 
id_fields : Optional[list[ralsei.types.IdColumn]] = None¶
- Columns that uniquely identify a row in - source_table, so that you can update- is_done_column- This argument takes precedence over - id_fieldsinferred from- fn’s metadata
 
- class ralsei.task.MapToNewColumns¶
- Bases: - ralsei.task.base.TaskDef- Applies the provided map function to a query result, saving outputs into new columns on the same row - Variables passed to jinja: - table= - table
- is_done= - is_done_column(as- ralsei.types.Identifier)
 - Example - import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }- 
table : ralsei.graph.Resolves[ralsei.types.Table] = None¶
- Table to add columns to - May be the output of another task 
 - 
columns : Sequence[ralsei.types.ValueColumnBase] = None¶
- List of new columns - Used for - ADD COLUMNand- UPDATEstatement generation.
 - 
fn : ralsei.wrappers.OneToOne = None¶
- Function that maps one row to values of the new columns in the same row - If - id_fieldsargument is omitted, will try to infer the- id_fieldsfrom metadata left by- ralsei.wrappers.pop_id_fields()
 - 
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] = 'field(...)'¶
- Task-scoped context-manager arguments passed to - fn- Example - from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
 - 
is_done_column : Optional[str] = None¶
- Create a boolean column with the given name in - tablethat tracks which rows have been processed- If set, the task will commit after each successful run of - fn, allowing you to stop and resume from the same place.- Note - Make sure to include - WHERE NOT {{is_done}}in your- selectstatement
 - 
id_fields : Optional[list[ralsei.types.IdColumn]] = None¶
- Columns that uniquely identify a row in - table, so that you can update- is_done_column- This argument takes precedence over - id_fieldsinferred from- fn’s metadata
 
- class ralsei.task.CreateTableTask¶
- Bases: - ralsei.task.base.TaskImpl- Base class for a task that performs table creation - All you have to do is call - _prepare_table()from within- ralsei.task.TaskImpl.prepare().- output,- _exists()and- _delete()are implemented for you, leaving only the- ralsei.task.TaskImpl._run()part- Example - import pandas as pd class UploadCsv(TaskDef): table: Table path: Path class Impl(CreateTableTask): def prepare(self, this: "UploadCsv"): self._prepare_table(this.table) self.__path = this.path def _run(self, conn: ConnectionEnvironment): with self.__path.open() as file: pd.read_csv(file).to_sql( self._table.name, conn.sqlalchemy, schema=self._table.schema )- 
_table : ralsei.types.Table = None¶
 - 
_drop_sql : sqlalchemy.sql.elements.TextClause = None¶
 - 
_prepare_table(table: ralsei.types.Table, view: bool = False)¶
 - _exists(conn: ralsei.connection.ConnectionEnvironment) bool¶
 - _delete(conn: ralsei.connection.ConnectionEnvironment)¶
 
- 
_table : ralsei.types.Table = 
- class ralsei.task.AddColumnsTask¶
- Bases: - ralsei.task.base.TaskImpl- Base class for a task that adds columns to a table - All you have to do is call - _prepare_columns()from within- ralsei.task.TaskImpl.prepare().- output,- _exists()and- _delete()are implemented for you, leaving only the- ralsei.task.TaskImpl._run()part- 
_table : ralsei.types.Table = None¶
 - 
_columns : list[ralsei.types.ColumnRendered] = None¶
 - 
_add_columns : ralsei.db_actions.AddColumns = None¶
 - 
_drop_columns : ralsei.db_actions.DropColumns = None¶
 - 
_prepare_columns(table: ralsei.graph.Resolves[ralsei.types.Table], columns: collections.abc.Sequence[ralsei.types.ColumnBase], *, if_not_exists: bool = False)¶
 - _exists(conn: ralsei.connection.ConnectionEnvironment) bool¶
 - _delete(conn: ralsei.connection.ConnectionEnvironment)¶
 
- 
_table : ralsei.types.Table = 
- 
ralsei.task.ROW_CONTEXT_ATRRIBUTE = '__ralsei_row_context'¶
- Attribute that gets added to exceptions that occured in a row processing context. 
 Stores the popped fields of the aforementioned row
- 
ralsei.task.ROW_CONTEXT_VAR : contextvars.ContextVar[dict] = 'ContextVar(...)'¶
- ContextVar storing the popped fields of the currently processed row (used by - ralsei.console.consolefor logging purposes)