transform operator
When to use the transform operator
The transform operator allows you to implement the T of an ELT system by running a SQL query. Each step of the transform pipeline creates a new table from the SELECT statement and enables tasks to pass those tables as if they were native Python objects.
The transform operator treats values in the double brackets as Airflow jinja templates. You can find more details on templating at Templating.
There are two main uses for the transform operator.
- Case 1: Passing tables between tasks while completing data transformations.
The following example applies a SQL
SELECTstatement to aimdb_moviestable with templating and saves the result to a astro-sdk tmp table.Note that the
input_tablein the double brackets is treated as an Airflow jinja template. It is not an f string. F-strings in SQL formatting are at risk of security breaches via SQL injections. For security, you must explicitly identify tables in the function parameters by typing a value as a Table. Only then will thetransformoperator treat the value as a table.@aql.transform() def top_five_animations(input_table: Table): # skipcq: PYL-W0613 return """ SELECT * FROM {{input_table}} WHERE genre1=='Animation' ORDER BY rating desc LIMIT 5; """
The following example applies a SQL
SELECTstatement to aimdb_moviestable with templating and saves the result to a astro-sdk tmp table.@aql.transform() def last_five_animations(input_table: Table): # skipcq: PYL-W0613 return """ SELECT * FROM {{input_table}} WHERE genre1=='Animation' ORDER BY rating asc LIMIT 5; """
You can easily pass tables between tasks when completing a data transformation.
@aql.transform def union_top_and_last(first_table: Table, second_table: Table): # skipcq: PYL-W0613 """Union `first_table` and `second_table` tables to create a simple dataset.""" return """ SELECT title, rating from {{first_table}} UNION SELECT title, rating from {{second_table}}; """
- Case 2: Passing a Pandas dataframe between tasks while completing data transformations.
The following example shows how you can quickly pass a table and a Pandas dataframe between tasks when completing a data transformation.
@aql.transform def union_table_and_dataframe(input_table: Table, input_dataframe: pd.DataFrame): # skipcq: PYL-W0613 """Union `union_table` table and `input_dataframe` dataframe to create a simple dataset.""" return """ SELECT title, rating from {{input_table}} UNION SELECT title, rating from {{input_dataframe}}; """
Please note that in case you want to pass SQL file in the transform decorator, use transform_file operator
Parameters
query_modifier - The
query_modifierparameter allows you to define statements to run before and after therun_raw_sqlmain statement. To associate a Snowflake query tag, for instance, it is possible to usequery_modifier=QueryModifier(pre_queries=["ALTER SESSION SET QUERY_TAG=<my-query-tag>]).