【BigQuery】JSONテクニック。キーが動的な場合はダミーに置換
先日、BigQueryのJSON型データを扱う際に、キーがバラバラで値の抽出処理ができなくて困った。
こちらの記事を参考にして、キーをダミーの文字列に置換して処理する方法でなんとか解決できた。
例えば下記のようなキーにユーザID、値にユーザ名が入っている場合、普通のやり方だとユーザIDを事前に知らないとユーザ名を抽出できない。
ここでユーザIDを一度「KEY」という文字列に置換してから、JSONのパスを「$.KEY」で指定することで、ユーザ名を取り出すことができる。
WITH sample AS (
SELECT JSON '{"8901": "山田 太郎", "8071": "鈴木 次郎"}' AS json_data
)
SELECT
JSON_VALUE(
REGEXP_REPLACE(
TO_JSON_STRING(json_data),
CONCAT('"', key, '"'),
'"__KEY__"'
),
'$.__KEY__'
) AS value
FROM sample,
UNNEST(JSON_KEYS(json_data)) AS key
JSON型のデータを文字列にして、その中のキーをダミーの文字列に変換するという処理は無理矢理感がスゴイけど、こうする他ないこともまあまあるので、知っておくと役に立つかも。
GKE上にAirflowを構築する
概要
TerraformとHelmを使用して、gcloudコマンドやkubectlコマンドを使わずに、Rodel van Rooijen氏のMediumの記事に沿ってGKE上にAirflowクラスタを作成します。
TerraformとHelmのコードは下記のGitHubリポジトリに置いてます。
元の記事とは異なる点がいくつかあるので、ご注意ください。
- Airflow Web UIはインターネットに公開されます。
- KubernetesのPersistent Volumeを手動で作成 (
kubectl apply) する必要はありません。KubernetesのPersistent Volume ClaimによってCompute Diskが自動的に作成されます。詳細はPersistent volumes and dynamic provisioning(永続ボリュームと動的プロビジョニング)を参照してください。 - ログはPodのライフサイクル中のみ利用可能です。ログを永続化したい場合は、Airflow Helm: Manage logs(Airflow Helm:ログの管理)を参照してください。
- Compute Engine DiskはDAGファイルの管理には使用されません。DAGファイルを効率的に管理したい場合は、Airflow Helm: Manage DAGs files(Airflow Helm:DAGファイルの管理)を参照してください。
terraform applyの前に準備すること
terraform applyするには、以下のコマンドラインツールを準備する必要があります。
- gcloud
- docker
- kubectl
- helm
また、下記の点にご注意ください。
- Terraformのコードは、GKEの認証情報を上書きするために
~/.kubeディレクトリを削除します。必要であれば、事前に~/.kubeディレクトリをバックアップしてください。 - Terraformの適用前にGCPプロジェクトが作成済みであり、必要なGCP APIがすべて有効になっている必要があります。
注意事項
下記の点において本番環境レベルのコードではありません。
- デフォルトのVPCネットワークは、カスタムVPCに置き換えるべきです。
- Cloud SQLのパスワードは、Terraformの変数ではなく、Secret Managerなどのより安全な方法で管理すべきです。
- Terraformのstateファイルは、ローカル環境ではなく、GCS (Google Cloud Storage) に保存すべきです。
- Git-syncを使ったDAGファイルの管理の方が便利です。
- ログはPodのライフサイクル中のみ利用可能です。
このAirflow環境は、DAGの開発などに利用できるとは思います。
ハマったところ
Airflow Helm chartには2つの種類があります。公式Helm chartとコミュニティ版です。
それを理解してない状態で調べたので、色々とごちゃごちゃに理解してしまい、時間がかかってしまいました。
このリポジトリでは公式Helm chartを使用してます。
気づき
開発中に気づいたことのメモ。
- Compute Engine DiskはPVC (Persistent Volume Claim) のReadWriteManyアクセスモードをサポートしてない。
standards-rwoStorage Classは、PVC作成直後にCompute Engine Diskを作成しない。
参考資料
Deploying Airflow on GKE using Helm
Workload Identity in GKE with Terraform
Dynamic Provisioning and Storage Classes in Kubernetes
Persistent volumes and dynamic provisioning
Apache Airflow ETL in Google Cloud
Alternative: link Kubernetes ServiceAccounts to IAM
Deploying Airflow on Google Kubernetes Engine with Helm
Deploying Airflow on Google Kubernetes Engine with Helm — Part Two
AirflowのカスタムTrigger
概要
ComposerのAirflowのカスタムTriggerを作成した。
色々と難しかったけど、使えるとリソース消費を抑えられて、コスト抑制できるのでおススメ。
なぜカスタムTriggerを使ったのか
定期的にAPIでBigQueryテーブルの更新状況をチェックして更新されていたら、テーブルに依存する別のテーブルを更新したかった。
AirflowのWorkerでその処理を実行しようとすると、テーブルが更新されるまでCPUとメモリを占有してしまい、お金がかかる。
更新状況チェック対象のテーブルが大量にあるのと、数時間というレベルで待機が必要になる。
そういったユースケースに対して、導入されたのがTriggerプロセス。
AirflowのOperatorでdeferメソッドを実行すると、Triggererプロセスに処理が渡され、非同期で複数の処理をまとめて実行してくれる。
Triggerの処理が完了すると、呼び出し元のOperatorに処理が戻されてタスクが完了する。
Triggererプロセスに処理が渡されている間は、タスクはdefferedステータスになり、Workerで使用されていたリソースは解放されるので、テーブルが更新されるまで待機するといったタスクでもリソース消費を抑えて処理できる。
カスタムTriggerの動かし方
今回はGCP ComposerのAirflowでカスタムTriggerを動かした。
DAGとは違って、ComposerバケットにDAGのPythonファイルを入れても動かない。
TriggererプロセスはAirflowの起動時に読み込まれて実行が開始される。
そのため、PyPIパッケージとしてComposerにインストールする必要がある。
ComposerにカスタムPyPIパッケージをインストールする方法は色々あるが、今回はArtifact Registryの標準リポジトリに生成したPythonパッケージをアップロードして、そこからComposerにインストールした。
その話はまた別途投稿したい。
コード
基本的には下記を参考にすれば、問題なく実装できると思う。
何か不明な点や解決できない問題があれば、メッセージください。
読書メモ:Architecture Patterns with Python
↑2024年で一番の技術書だった。
DDDやClean Architectureの実践についてPythonを題材に学ぶことができる。
概念を紹介するだけの書籍が多いけど、この本はTDDで実際のコードを元に学べる。しかもPythonで。
特にアーキテクチャパターンの実装については勉強になった。
読書メモ ~ ドメイン駆動設計入門
概要
ドメイン駆動設計入門を読んだのでメモ。
間違いがあるかもしれないので、正しい情報は書籍で確認してください。
重要なポイント
ドメイン駆動設計は、解決したい課題の領域について知識を得ながら設計を行う設計手法。
オブジェクト指向プログラミングが重要な要素の1つになっており、本書では設計パターンを中心に解説している。
パターンと対をなすモデリングは本書では、ほとんど扱っていない。あくまでドメイン駆動設計の開発を学ぶのに必要となる知識を学ぶ。
設計パターン
下記のパターンが紹介される。思い出せなくなったら、本書を読み返すと良い。
文字列型などのプリミティブ型を使うのではなく、ドメイン特有の値を表すクラス=値オブジェクトを使用する。例えばお金を表す場合に、int型を使うのではなくmoneyクラスを用意する。
値オブジェクトは比較可能である必要がある。moneyクラスでいうと、金額と通貨が同じ場合に同一であると判定する。
また、不変かつ交換可能である必要がある。moneyクラスを一度インスタンス化したら、インスタンスの属性を変更することはできない(不変性)。
値を変更したい場合には、別の値をもつインスタンスを作成し、既存のインスタンスと交換する必要がある。
一方、エンティティは可変かつ同じ属性であっても区別される。userクラスを考えると、エンティティがわかりやすい。ユーザ名は変更する。そして、同じユーザ名であっても必ずしも同一のユーザであるとは限らない。エンティティはそのような性質を持つ。エンティティの判断基準(値オブジェクトとの違い)は、ライフサイクルをもつかという点である。ユーザのように作成され、いずれ削除されるような概念はエンティティと判断される。
値オブジェクトやエンティティをドメインオブジェクトと呼ぶ。ドメインオブジェクトには文字数や文字種チェックなどのふるまいが定義される。値やエンティティ自身の性質や制限を、ドメインオブジェクトのクラス内に記述する。
一方、ドメインオブジェクト間の制約はドメインサービス(クラス)に記述する。例えば、同じユーザ名の重複を許可しない場合は、UserServiceクラスを作成し、ドメインサービスで重複チェックを行う。
リポジトリは、データストアとの連携に使用する。ビジネスロジック(UserApplicationService、アプリケーションサービス)内に、データストアへのアクセスを直接記述すると、処理内容の把握が難しくなる。そこでリポジトリのインターフェース(IUserRepository)を作成し、具体的なクラス(UserRepository)に処理を記述する。ビジネスロジックのプログラムでは、インターフェースに対して操作を行う。これは依存関係逆転の原則に従っている。UserApplicationServiceはIUserRepositoryに依存し、UserRepositoryはIUserRepositoryに依存している。