こんにちは。バクラク事業部 機械学習・データ部 データグループの@civitaspoです。もう年の瀬ですが、皆さんはこの一年どんな年だったでしょうか?私はSnowflake一色の一年でした。データ基盤の主要技術をBigQueryからSnowflakeへ移管するプロジェクトの推進を行い、その間Snowflakeを学び、発信し、Snowfalke九州ユーザーグループの立ち上げも行いました。
また、Snowflake Squadへ選出され、Snowflakeチョットワカル人材認定もされました。
総じて、最高な一年でした。来年はSnowflakeを中心に、より高度なデータ活用を推進していくぞ 💪
さて、今回はタイトルのとおり、 dbt-snowflake の Python Model で Custom Materialization を実装する方法を書きます。今回はStored Procedureにフォーカスして書きますが、UDFなど他のオブジェクトを実装するときも同様の手法で書けるので、Python Model で Custom Materialization を実装したいと思っている方は読んでもらえると嬉しいです。
はじめに、本記事に登場する用語をいくつか説明したあと、本題となるdbt Python Modelを使ってCustom Materializationを実装する方法を書きます。
dbt Python Modelとは
dbt Python Modelは、dbtのSQLベースのモデリング機能を拡張し、Pythonを使用してデータ変換を行える機能です。Snowflakeの場合、Snowpark Python APIを利用して、データフレーム操作やPythonの豊富なライブラリ(Pandasなど)を活用した複雑なデータ処理を実現できます。従来のSQLでは難しかった機械学習モデルの適用や高度な統計計算なども、Python Modelを使用することで簡単に実装できるようになりました。
現状、dbt Python Modelは、dbtがサポートしているmaterializationのうち、table materializationとincremental materializationのみをサポートしています。
dbt Custom Materializationとは
dbt Custom Materializationは、独自のmaterialization戦略を実装できる機能です。デフォルトのmaterialization(view, table, incremental, ephemeral)以外にも、要件に合わせた独自のデータ保存・更新方法を定義することができます。このCustom Materialization機能は、dbtの拡張性の高さを象徴するような機能で、高度なデータパイプラインの要件に柔軟に対応できる強力なツールとなっています。例えば、特定のテーブル構造の自動生成や、複雑なマージ戦略の実装、あるいはカスタムのバックフィル処理など、組織固有のニーズに合わせた実装が可能です。
本記事では、このCustom Materialization機能を使って、SnowflakeのStored Procedureを定義する方法を書きます。なお、Custom Materialization実装に関する基礎的な理解は本記事では割愛します。Custom Materializationに関する基礎を理解したい方は、公式ドキュメントに記載されているガイドをご一読ください。
Snowflake Stored Procedureとは
Snowflake Stored Procedureは、Snowflake上で手続き的な処理を定義・実行できる機能です。SQLだけでなく、JavaScriptやPython、JavaやScalaといったプログラム言語を使用して、複雑なビジネスロジックやデータ処理を実装することができます。
本記事では、Snowflake Stored ProcedureをPythonで実装することを想定したCustom Materializationを実装します。
dbt Python Modelを使ったCustom Materialization実装
ここから本題となる実装内容について書いていきます。
実装したコード
まず、最初に実装したコードを載せます。以下のコードをコピーして使用すれば、SnowflakeのStored ProcedureをdbtのPython Modelで実装することができます。
/* ./macros/materializations/procedure/snowflake__procedure.sql */ {%- materialization procedure, adapter='snowflake', supported_languages=['sql', 'python'] -%} {%- set identifier = model['alias'] -%} {%- set language = model['language'] -%} {%- set compiled_code = model['compiled_code'] -%} {%- set arguments = config.get('arguments', default=[]) -%} -- name, type, default {%- set return_type = config.get('return_type', default='varchar' ) -%} {%- set execute_as = config.get('execute_as', default='caller' ) -%} {%- set runtime_version = config.get('runtime_version', default='3.11' ) -%} {%- set packages = config.get('packages', default=[]) -%} {%- set imports = config.get('imports', default=[]) -%} {%- set external_access_integrations = config.get('external_access_integrations', default=[]) -%} {%- set secret_mapping = config.get('secret_mapping', default={}) -%} {%- set comment = config.get('comment', default='') -%} {%- for arg in arguments -%} {%- if not arg.get('name', None) -%} {{ exceptions.raise_compiler_error("Argument 'name' is required for Python procedures") }} {%- endif -%} {%- if not arg.get('type', None) -%} {{ exceptions.raise_compiler_error("Argument 'type' is required for Python procedures") }} {%- endif -%} {%- endfor -%} {%- if language == 'python' -%} {%- if 'snowflake-snowpark-python' not in packages -%} {{ exceptions.raise_compiler_error("Package 'snowflake-snowpark-python' is required for Python procedures") }} {%- endif -%} -- https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#prerequisites-for-writing-stored-procedures-locally {%- set supported_python_runtime_versions = ['3.8', '3.9', '3.10', '3.11'] -%} {%- if runtime_version not in supported_python_runtime_versions -%} {{ exceptions.raise_compiler_error("Runtime version '" ~ runtime_version ~ "' is not supported for Python procedures. Supported versions: " ~ supported_python_runtime_versions | join(', ')) }} {%- endif -%} {%- endif -%} {%- set target_relation = api.Relation.create( identifier=identifier, schema=schema, database=database) -%} {%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- transaction start {{ run_hooks(pre_hooks, inside_transaction=True) }} {% call statement('main') -%} {{ snowflake_create_stored_procedure_statement( relation=target_relation, language=language, arguments=arguments, return_type=return_type, execute_as=execute_as, runtime_version=runtime_version, packages=packages, imports=imports, external_access_integrations=external_access_integrations, secret_mapping=secret_mapping, comment=comment, compiled_code=compiled_code ) }} {%- endcall %} {{ run_hooks(post_hooks, inside_transaction=True) }} {{ adapter.commit() }} -- transaction end {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} {%- endmaterialization -%}
/* ./macros/materializations/procedure/snowflake_create_procedure_statement.sql */ {%- macro snowflake_create_stored_procedure_statement( relation, language, arguments, return_type, execute_as, compiled_code, runtime_version, packages, imports, external_access_integrations, secret_mapping, comment ) -%} {{ log("Creating Stored Procedure " ~ relation) }} CREATE OR REPLACE PROCEDURE {{ relation.include(database=(not temporary), schema=(not temporary)) }}( {%- for arg in arguments %} {{ arg['name'] }} {{ arg['type'] }} {% if arg.get('default', none) is not none%}default {{ arg['default'] }}{% endif %} {%- if not loop.last %},{% endif %} {%- endfor %} ) returns {{ return_type }} language {{ language }} {%- if language == 'python' %} runtime_version = '{{ runtime_version }}' {%- if packages | length > 0 %} packages = ( {%- for p in packages %} '{{ p }}'{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} {%- if imports | length > 0 %} imports = ( {%- for i in imports %} '{{ i }}'{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} handler = 'main' {%- if external_access_integrations | length > 0 %} external_access_integrations = ( {%- for e in external_access_integrations %} {{ e }}{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} {%- if secret_mapping | length > 0 %} secrets = ( {%- for k, v in secret_mapping.items() %} '{{ k }}' = {{ v }}{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} {%- endif %} comment = $$ {{ comment }} $$ execute as {{ execute_as }} AS $$ {{ compiled_code }} {% if language == 'python' %} def main(session, *args, **kwargs): dbt = dbtObj(session.table) model(dbt, session) return procedure(session, *args, **kwargs) {% endif %} $$ ; {%- endmacro -%}
実装したコードの使用例
以下のように、 materialized="procedure" を指定することで Stored Procedure を定義できるようになります。Stored Procedureでは procedure メソッドがCallされます。この例では第一引数に与えられた文字列のPrefixに hello を加えた文字列を返すだけのStored Procedureが定義されます。
def model(dbt, session): dbt.config( enabled=True, materialized="procedure", database="example_db", schema="procedures", alias="example_procedure", arguments=[ # Stored Procedureの引数定義 {"name": "arg1", "type": "varchar"}, {"name": "arg2", "type": "number", "default": "5"}, ], runtime_version="3.11", packages=[ # 使用したいライブラリの定義 "snowflake-snowpark-python", "pandas", ], external_access_integrations=["example_api"], # 外部APIをCallする場合、External Access Integrationの指定が必要 secret_mapping={ # 秘匿情報を扱う場合の指定 "example_token": "example_db.secrets.example_token", }, ) return None from snowflake.snowpark import Session import pandas as pd def procedure( session: Session, arg1 str, arg2: int, ) -> str: return "hello " + arg1
dbt Python Modelを使ったCustom Materialization実装の解説
ここから実装したコードを詳しく解説していきます。 snowflake__procedure.sql と snowflake_create_procedure_statement.sql という2つのファイルで構成されています。Custom Materialization の起点となるコードは snowflake__procedure.sql に実装されていて、snowflake_create_procedure_statement.sql はsnowflake__procedure.sql から呼ばれる構成となっています。
snowflake__procedure.sql
snowflake__procedure.sqlについて詳しく説明していきます。このファイルはdbtのCustom Materializationを実装する上で必須の処理とdbt Python Modelを定義する際に必要なconifgパラメータの定義を行っています。
{%- materialization procedure, adapter='snowflake', supported_languages=['sql', 'python'] -%}
この記述では、 procedure という名前の materialization を Snowflake 向けに定義しています。supported_languages には sql と python を指定しています。現状のdbtではこの2つのみ使用可能です。
{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}
{%- set compiled_code = model['compiled_code'] -%}
ここでは model オブジェクトから処理に必要な情報を取得しています。language パラメータはファイルの拡張子によって sql または python が格納されています。
以降の記述は抜粋する量が多くなるので、抜粋は割愛して説明を進めます。run_hooks の処理までの記述では、materialization に必要なパラメータの取得を行っています。SnowflakeのStored Procedureを定義するために必要なパラメータをconfigで指定してもらう想定です。
その後、snowflake_create_procedure_statement.sqlのcallに続きます。
snowflake_create_procedure_statement.sql
このファイルはjinja2テンプレートが大量に記載されていて複雑に見えますが、SnowflakeのStored Procedureを定義するためのSQLを組み立てています。
特筆すべきは {% if language == 'python' %} から始まる以下の記述です。
{% if language == 'python' %}
def main(session, *args, **kwargs):
dbt = dbtObj(session.table)
model(dbt, session)
return procedure(session, *args, **kwargs)
{% endif %}
この記述は、dbt-snowflakeのPython Modelによるtable materializationの実装を模倣しています。
dbt Python Modelでは、compile時にdbtプロジェクト内のリソースへアクセスするためのアダブターが挿入されます。上記の実装は、このアダプターを使用するための実装です。
これら2つのファイルによってSnowflake Stored Procedureをdbt Python Modelを使ったCustom Materializationで実装できるようになります。
おわりに
本記事では、dbt Python ModelにおけるCustom Materializationの実装方法について、Snowflake Stored Procedureを例に解説しました。dbt Python Modelを使ったCustom Materializationの例はまだあまり世の中に多くないので、ぜひ本記事を参考にしてCustom Materializationの実装にトライしていただけると嬉しいです。
LayerXでは一緒にデータ基盤を作ってくれる仲間を募集しています。ちょっとでも興味のある方は一度ぜひお話しましょう!