ralsei.task
¶
Package Contents¶
Classes¶
Base task class |
|
Task implementation created from |
|
Stores task aguments before said task is created |
|
Runs a |
|
Adds the specified Columns to an existing Table and runs the SQL script to fill them with data |
|
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table |
|
Applies the provided map function to a query result, saving outputs into new columns on the same row |
|
Base class for a task that performs table creation |
|
Base class for a task that adds columns to a table |
Data¶
Attribute that gets added to exceptions that occured in a row processing context. |
|
ContextVar storing the popped fields of the currently processed row
(used by |
API¶
- class ralsei.task.Task¶
Bases:
abc.ABC
Base task class
- abstract run(conn: ralsei.connection.ConnectionExt)¶
Run the task
- abstract delete(conn: ralsei.connection.ConnectionExt)¶
Delete whatever
run()
has created
- redo(conn: ralsei.connection.ConnectionExt)¶
- abstract property output : Any¶
Object created or modified by this task (usually a
ralsei.types.Table
)Used for resolving
ralsei.graph.Pipeline.outputof()
- abstract exists(conn: ralsei.connection.ConnectionExt) bool ¶
Check if task has already been done
- scripts() collections.abc.Iterable[tuple[str, object]] ¶
Get SQL scripts rendered by this task
- Returns:¶
iterable of
("name", script)
, where script is either:a string-like object, usually
str
orsqlalchemy.sql.elements.TextClause
a list of string-like objects (in case of multiple statements)
- class ralsei.task.TaskImpl(this: D, env: ralsei.jinja.ISqlEnvironment)¶
Bases:
ralsei.task.base.Task
Task implementation created from
TaskDef
arguments- Parameters:¶
- this : TaskDef¶
the settings object for this task
- env: ralsei.jinja.ISqlEnvironment¶
jinja environment
Warning
It is advised againts overriding
__init__
. Perform your initialization inprepare()
instead.Initialization
-
env : ralsei.jinja.ISqlEnvironment =
None
¶
-
_scripts : dict[str, object] =
None
¶ You can save your sql scripts here when you render them, the key-value pairs will be returned by
scripts()
Example
class Impl(TaskImpl) def prepare(self, this: "MyTaskDef") self._scripts["Create table"] = self.__create = self.env.render(this.sql)
- resolve(value: ralsei.graph.Resolves[ralsei.task.base.TaskImpl.resolve.T]) ralsei.task.base.TaskImpl.resolve.T ¶
Resolve a dependency
- Parameters:¶
- value : ralsei.graph.OutputOf | T¶
may or may not need dependency resolution
- Returns:¶
the resolved value
- Return type:¶
T
- run(conn: ralsei.connection.ConnectionExt)¶
- delete(conn: ralsei.connection.ConnectionExt)¶
- exists(conn: ralsei.connection.ConnectionExt) bool ¶
- abstract _run(conn: ralsei.connection.ConnectionEnvironment)¶
Run the task
- abstract _delete(conn: ralsei.connection.ConnectionEnvironment)¶
Delete whatever
_run()
has created
- abstract _exists(conn: ralsei.connection.ConnectionEnvironment) bool ¶
Check if task has already been done
- scripts() collections.abc.Iterable[tuple[str, object]] ¶
Get SQL scripts rendered by this task
- class ralsei.task.TaskDef¶
Stores task aguments before said task is created
Any subclass of
TaskDef
automatically getsdataclasses.dataclass()
decorator applied to it-
Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] =
None
¶ The associated task class
Note
This field is not part of the dataclass
Example
class MyTask(TaskDef): class Impl(TaskImpl): def prepare(self, this: "MyTask"): ...
- create(env: ralsei.jinja.SqlEnvironment) ralsei.task.base.TaskImpl[Self] ¶
Instantiate the associated
Impl
-
Impl : ClassVar[type[ralsei.task.base.TaskImpl[Self]]] =
- class ralsei.task.CreateTableSql¶
Bases:
ralsei.task.base.TaskDef
Runs a
CREATE TABLE
sql scriptVariables passed to the template:
table
,view
Example
unnest.sql
CREATE TABLE {{table}}( id SERIAL PRIMARY KEY, name TEXT ); {%-split-%} INSERT INTO {{table}}(name) SELECT json_array_elements_text(json->'names') FROM {{sources}};
pipeline.py
"unnest": CreateTableSql( sql=Path("./unnest.sql").read_text(), table=Table("new_table"), locals={"sources": self.outputof("other")}, )
Note
You can use
ralsei.utils.folder()
to find SQL files relative to current file-
sql : str | list[str] =
None
¶ Sql template strings
Individual statements must be either separated by
{%split%}
tag or pre-split into a list
-
table : ralsei.types.Table =
None
¶ Table being created
-
sql : str | list[str] =
- class ralsei.task.AddColumnsSql¶
Bases:
ralsei.task.base.TaskDef
Adds the specified Columns to an existing Table and runs the SQL script to fill them with data
Variables passed to the template:
table
Columns can be defined in the template itself, using{% set columns = [...] %}
Example
postprocess.sql
{% set columns = [Column("name_upper", "TEXT")] -%} UPDATE {{table}} SET name_upper = UPPER(name);
pipeline.py
"postprocess": AddColumnsSql( sql=Path("./postprocess.sql").read_text(), table=Table("people"), )
Note
You can use
ralsei.utils.folder()
to find SQL files relative to current file-
sql : str | list[str] =
None
¶ Sql template strings
Individual statements must be either separated by
{%split%}
tag or pre-split into a list
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None
¶ Table to add columns to
May be the output of another task
-
columns : Optional[Sequence[ralsei.types.ColumnBase]] =
None
¶ these column definitions take precedence over those defined in the template
-
sql : str | list[str] =
- class ralsei.task.MapToNewTable¶
Bases:
ralsei.task.base.TaskDef
Applies the provided map function to a query result, mapping a single row to one or more rows in a new table
Variables passed to jinja:
table=
table
source=
source_table
is_done=
is_done_column
(asralsei.types.Identifier
)
Example
from parsel import Selector from ralsei import ( Pipeline, MapToNewTable, Table, ValueColumn, Placeholder, compose, add_to_input, pop_id_fields, ) # Find subjects on the hub page def find_subjects(hub_url: str): html = download(hub_url) sel = Selector(html) for row in sel.xpath("//table/tr"): yield { "subject": row.xpath("a/text()").get(), "url": row.xpath("a/@href").get() } # Download all pages in a subject rating def download_pages(url: str): next_url = url page = 1 while next_url is not None: html = download(next_url) yield { "page": page, "html": html } sel = Selector(html) next_url = sel.xpath("//a[@id = 'next']").get() page += 1 class MyPipeline(Pipeline): def create_tasks(self): return { "subjects": MapToNewTable( table=Table("subjects"), columns=[ # (1) "id SERIAL PRIMARY KEY", ValueColumn("subject", "TEXT"), ValueColumn("url", "TEXT"), ], fn=compose( find_subjects, add_to_input(hub_url="https://rating.com/2022") ) ), "pages": MapToNewTable( source_table=self.outputof("subjects"), select="""\ SELECT id, url FROM {{source}} WHERE NOT {{is_done}}""", table=Table("pages"), columns=[ ValueColumn( "subject_id", "INT REFERENCES {{source}}(id)", Placeholder("id") ), ValueColumn("page", "INT"), ValueColumn("html", "TEXT"), "date_downloaded DATE DEFAULT NOW()", ], is_done_column="__downloaded", fn=compose(download_pages, pop_id_fields("id")) ) }
Table body is generated from all
columns
,INSERT
statement - only fromralsei.types.ValueColumnBase
-
table : ralsei.types.Table =
None
¶ The new table being created
-
columns : Sequence[str | ralsei.types.ValueColumnBase] =
None
¶ Columns (and constraints) that make up the table definition
Additionally,
ralsei.types.ValueColumnBase.value
field is used forINSERT
statement generationstr
columns andralsei.types.ValueColumn
’s type are passed through the jinja renderer
-
fn : ralsei.wrappers.OneToMany =
None
¶ A generator function, mapping one row to many rows
If
id_fields
argument is omitted, will try to infer theid_fields
from metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'
¶ Task-scoped context-manager arguments passed to
fn
Example
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewTable( fn=scrape_page, context={"browser": browser_context} )
-
select : Optional[str] =
None
¶ The
SELECT
statement that generates rows passed tofn
as argumentsIf not specified,
fn
will only run once with 0 arguments.
-
source_table : Optional[ralsei.graph.Resolves[ralsei.types.Table]] =
None
¶ The table where the input rows come from
If not creating
is_done_column
, you can leave it asNone
.
May be the output of another task.
-
is_done_column : Optional[str] =
None
¶ Create a boolean column with the given name in
source_table
that tracks which rows have been processedIf set, the task will commit after each successful run of
fn
, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}
in yourselect
statement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None
¶ Columns that uniquely identify a row in
source_table
, so that you can updateis_done_column
This argument takes precedence over
id_fields
inferred fromfn
’s metadata
- class ralsei.task.MapToNewColumns¶
Bases:
ralsei.task.base.TaskDef
Applies the provided map function to a query result, saving outputs into new columns on the same row
Variables passed to jinja:
table=
table
is_done=
is_done_column
(asralsei.types.Identifier
)
Example
import requests from parsel import Selector from ralsei import ( Pipeline, MapToNewColumns, Table, ValueColumn, Sql, compose_one, pop_id_fields, ) def download(url: str): response = requests.get(url) response.raise_for_status() return {"html": response.text} def parse(html: str): sel = Selector(html) return { "title": sel.xpath("//h1/text()").get(), "rating": sel.xpath("//div[@id='rating']/text()").get(), } class MyPipeline(Pipeline): def create_tasks(self): return { "download": MapToNewColumns( table=Table("pages"), select="SELECT id, url FROM {{table}} WHERE NOT {{is_done}}", columns=[ ValueColumn("html", "TEXT"), ValueColumn("date_downloaded", "DATE", Sql("NOW()")), ], is_done_column="__downloaded", fn=compose_one(download, pop_id_fields("id")), ), "parse": MapToNewColumns( table=self.outputof("download"), select="SELECT id, html FROM {{table}}", columns=[ ValueColumn("title", "TEXT"), ValueColumn("rating", "TEXT"), ], fn=compose_one(parse, pop_id_fields("id")), ), }
-
table : ralsei.graph.Resolves[ralsei.types.Table] =
None
¶ Table to add columns to
May be the output of another task
-
columns : Sequence[ralsei.types.ValueColumnBase] =
None
¶ List of new columns
Used for
ADD COLUMN
andUPDATE
statement generation.
-
fn : ralsei.wrappers.OneToOne =
None
¶ Function that maps one row to values of the new columns in the same row
If
id_fields
argument is omitted, will try to infer theid_fields
from metadata left byralsei.wrappers.pop_id_fields()
-
context : dict[str, ralsei.contextmanagers.ContextManager[Any]] =
'field(...)'
¶ Task-scoped context-manager arguments passed to
fn
Example
from ralsei.contextmanagers import reusable_contextmanager_const from selenium import webdriver @reusable_contextmanager_const def browser_context(): browser = webdriver.Chrome() yield browser browser.quit() def scrape_page(browser: webdriver.Chrome): ... MapToNewColumns( fn=scrape_page, context={"browser": browser_context} )
-
is_done_column : Optional[str] =
None
¶ Create a boolean column with the given name in
table
that tracks which rows have been processedIf set, the task will commit after each successful run of
fn
, allowing you to stop and resume from the same place.Note
Make sure to include
WHERE NOT {{is_done}}
in yourselect
statement
-
id_fields : Optional[list[ralsei.types.IdColumn]] =
None
¶ Columns that uniquely identify a row in
table
, so that you can updateis_done_column
This argument takes precedence over
id_fields
inferred fromfn
’s metadata
- class ralsei.task.CreateTableTask¶
Bases:
ralsei.task.base.TaskImpl
Base class for a task that performs table creation
All you have to do is call
_prepare_table()
from withinralsei.task.TaskImpl.prepare()
.output
,_exists()
and_delete()
are implemented for you, leaving only theralsei.task.TaskImpl._run()
partExample
import pandas as pd class UploadCsv(TaskDef): table: Table path: Path class Impl(CreateTableTask): def prepare(self, this: "UploadCsv"): self._prepare_table(this.table) self.__path = this.path def _run(self, conn: ConnectionEnvironment): with self.__path.open() as file: pd.read_csv(file).to_sql( self._table.name, conn.sqlalchemy, schema=self._table.schema )
-
_table : ralsei.types.Table =
None
¶
-
_drop_sql : sqlalchemy.sql.elements.TextClause =
None
¶
-
_prepare_table(table: ralsei.types.Table, view: bool =
False
)¶
- _exists(conn: ralsei.connection.ConnectionEnvironment) bool ¶
- _delete(conn: ralsei.connection.ConnectionEnvironment)¶
-
_table : ralsei.types.Table =
- class ralsei.task.AddColumnsTask¶
Bases:
ralsei.task.base.TaskImpl
Base class for a task that adds columns to a table
All you have to do is call
_prepare_columns()
from withinralsei.task.TaskImpl.prepare()
.output
,_exists()
and_delete()
are implemented for you, leaving only theralsei.task.TaskImpl._run()
part-
_table : ralsei.types.Table =
None
¶
-
_columns : list[ralsei.types.ColumnRendered] =
None
¶
-
_add_columns : ralsei.db_actions.AddColumns =
None
¶
-
_drop_columns : ralsei.db_actions.DropColumns =
None
¶
-
_prepare_columns(table: ralsei.graph.Resolves[ralsei.types.Table], columns: collections.abc.Sequence[ralsei.types.ColumnBase], *, if_not_exists: bool =
False
)¶
- _exists(conn: ralsei.connection.ConnectionEnvironment) bool ¶
- _delete(conn: ralsei.connection.ConnectionEnvironment)¶
-
_table : ralsei.types.Table =
-
ralsei.task.ROW_CONTEXT_ATRRIBUTE =
'__ralsei_row_context'
¶ Attribute that gets added to exceptions that occured in a row processing context.
Stores the popped fields of the aforementioned row
-
ralsei.task.ROW_CONTEXT_VAR : contextvars.ContextVar[dict] =
'ContextVar(...)'
¶ ContextVar storing the popped fields of the currently processed row (used by
ralsei.console.console
for logging purposes)