DatastreamでCDC運用した半年の振り返り

ogp

この記事はenechain Advent Calendar 2025の15日目の記事です。

データプラットフォームデスクの山田です。普段は分析目的のためのデータパイプラインの構築や運用をしています。

今年からデータパイプラインにDatastreamを導入し、半年ほど運用してきました。簡単に導入でき、今のところ障害といった障害もなく安定して運用できており、とても助かっています。とはいえ、考慮しなければならないことや、困っていることもいくつかでてきました。 本記事では、Datastreamを導入した経緯と、半年運用してみて遭遇した困っている点を紹介します。

現在のDBのデータを取り込む仕組みとその課題

enechainではプロダクトのDBとしてCloudSQL for PostgreSQLを採用しており、分析等の目的のためにBigQueryへとデータを取り込む仕組み(DB Pipeline)を現在運用しています。 DB Pipelineについては過去のこちらの記事で紹介しています。 このDB Pipelineは定期的に起動し、CloudSQL上のテーブルの内容でBigQuery上のテーブルを丸々置き換える、いわゆるテーブルの洗い替えを行い、 BigQuery上への同期を行なっています。 この同期方法は、テーブルのデータを全件取得するため、確実性が高くシンプルで運用しやすい反面、高コストな処理です。

DB Pipelineが運用されるようになり1年程たち、導入プロダクトが増えてきたことで、プロダクト側からいくつかの課題や要望が上がってきました。

1. 同期頻度を上げて欲しい、リアルタイムに計算処理を行いたい

仕組み上はスケジュール間隔を短くして高頻度な同期を実現できますが、プロダクト側が期待している遅延時間内でのBigQueryへの同期を保証するものでもないことと、今現在実現できていたとしても近い将来データ量の増加と共に破綻してしまうことが目に見えています。 もちろん取り込み頻度が高くなることで、BigQueryのコンピュート料金もそれだけ増加してしまいます。 これらのことから、バッチ的な取り込み以外の方法でニアリアルタイムにBigQueryへ同期できるようにする必要がありました。

2. テーブルの更新履歴を記録したい

別のプロダクトからは、更新が多いテーブルの変更履歴を記録しておきたいという要望がありました。 DB Pipelineは取り込み実行時に、前回の取り込み時と新しい取り込み時にできたテーブル間の差分からプライマリキーベースでの変更履歴をつくる機能をすでに実装しています。 しかし、定期実行される間隔ごとの差分しか作れないので、定期実行の間に特定のプライマリキーのレコードが複数回更新された場合などはその差分を作れず、今のままでは要件を満たせませんでした。そのため、CloudSQL側で変更があるたびにその変更を取り込めるような仕組みが新たに必要でした。

Datastreamの採用

こうした課題を解決するために、CDC(Change Data Capture)サービスの導入を検討しはじめました。 CDCはデータソースで発生する変更イベント(INSERT, UPDATE, DELETEなど)をリアルタイムで取得し、他のシステムへと反映させる技術で、 今回の課題や要望を叶えるためにもってこいでした。

CloudSQLからBigQueryへの連携が容易なことから、Google Cloudが提供しているCDCサービスであるDatastreamを採用しました。 他にも、DatastreamはCDC対象とするテーブルを指定できるため、 必要なテーブルのみに絞って、運用コストを抑えながらすばやくCDC連携を開始できるのも採用の決め手でした。

Datastreamを含めたデータパイプライン構成

データパイプラインのアーキテクチャ

Datastreamを導入した後のデータパイプラインは以下です。

Datastream導入後のデータパイプライン

enechainでは、BigQuery上に連携されたデータは分析以外にも重要な業務に利用されることも多いため、 連携データの完全性に対して重きをおいています。 そのため、DB Pipelineによるバッチ的な取り込みをメインに据え、 Datastreamはニアリアルタイム性と変更履歴を提供するためのオプション的な立ち位置としてプロダクトに提供しています。

DB Pipelineは、実行のたびにCloudSQLの内容で洗い替えしている latestテーブル と、 日毎のスナップショットを保持し続ける日付分割テーブルの snapshotテーブル 群を定期的に作っています。

Datastreamは受け取った変更データをBigQueryへ書き込む際に、2つの動作モードを指定できます。 私たちは、「2.テーブルの更新履歴を記録したい」という要望があったことと、 Datastreamを全てのテーブルではなく一部のテーブルから導入するため、基本的にはDB Pipelineによるバッチ的な取り込みを中心に据えていたため、 appendモードを選択しています。 これによって、CloudSQLで行われたINSERTやUPDATEといった変更が、BigQuery上の cdcテーブル に積み重なります。

テーブル構成

既存のDB Pipelineによるバッチ的な取り込みによって作られていたテーブルに加えて、新たにDatastreamを導入することで変更データが積み上がっていく cdcテーブル が追加されました。 latestテーブルをベースにして、cdcテーブルを組み合わせることで、CloudSQLのデータをBigQuery上にニアリアルタイムに再現するrealtimeテーブル(ビュー)を提供しています。これは「メルペイDataPlatformのCDC DataPipeline 」の記事を参考にさせていただきました。

テーブル構成

ただし、cdcテーブルの利用時には注意点があります。 cdcテーブルはテーブルの作成自体をDatastreamが行いますが、その際にパーティショニングされていません。 そのため、realtimeテーブルを参照した際にcdcテーブルのフルスキャンが発生してしまい、 運用を続けて変更データが積み重なっていくと、次第にクエリコストが高くなってしまうことが予想されます。 Datastreamが作ったcdcテーブルを手動で作り直してパーティショニングすることはできそうでしたが、CDC対象のテーブルが増えるたびにテーブルの作り直し作業を行う必要があり、現実的ではありませんでした。 そこで、cdcテーブルは直近数日だけの変更データを保持し、それより古い変更データはarchiveテーブルに退避することで、realtimeテーブル参照時のcdcテーブルのスキャン量を抑えるようにしています。

なお、2025-11-17にどうやらBigQuery上の宛先テーブルにパーティショニングとクラスタリング設定を行えるようになったようです。リリースノートはこちら。 まだこの機能を試せていませんが、今後はもしかしたらarchiveテーブルが不要になるかもしれません!

Datastream導入時の注意点と運用を開始して困っていること

さて、Datastreamを導入し、運用を開始してから半年ほどがたちました。 ここからは導入時に遭遇した問題を元に、注意しておいた方が良い点と、運用していて困っている点をいくつか紹介します。 なお、先述のとおりenechainではPostgreSQL for CloudSQLを利用しているため、 PostgreSQLとDatastreamの組み合わせの際に起こる問題が中心です。

注意点

CloudSQLの再起動が伴う

enechainではPostgreSQL for CloudSQLを利用しています。 PostgreSQLをデータソースとした場合、DatastreamはPostgreSQLの論理レプリケーションを利用するため、 まずは有効化する必要があります。 これはGoogle Cloudのドキュメントにある通りなのですが、データベースフラグのlogical_decodingを有効化する作業は、CloudSQLの再起動を伴うため、注意が必要です。

Datastreamを導入する際には、既存のDBに対して追加で作業をすることになると思うので、 プロダクト側とメンテナンス等の日程調整を行い導入を進める必要があります。

対象テーブルを増やす時は順番が大事

CDC対象のテーブルを増やしたい場合は、以下のステップを踏む必要があります。

  1. 新しい対象テーブルにアクセスできるようにする
  2. 新しい対象テーブルをパブリケーションに追加する
  3. Datastreamの対象に新しい対象テーブルを追加する

パブリケーションに新しいテーブルを追加していない状態で、Datastream側の更新を行うと、Datastreamはエラーを吐いてしまいます。 責任範囲として、ステップ1,2はプロダクト側に実行してもらい、ステップ3をこちらのチームで行う場合がほとんどだと思います。 そのため、プロダクト側のリリーススケジュールとの調整も含め、2チーム間で連携をして順番に進める必要があるので、注意が必要です。

運用を開始して困っていること

サポートされていないデータ型がある

Datastreamではサポートされていないデータ型かいくつかあります。詳しくは既知の制限事項を参照してください。

CDC対象となったテーブルに、サポートされていないデータ型がある場合、そうしたカラムが除外されたcdcテーブルが作られます。 そのため、latestテーブルcdcテーブル間で、スキーマにギャップが生まれてしまい、 realtimeテーブルは少ないカラム数のcdcテーブルに合わせたスキーマにしなければならず、 カラムが欠けたテーブル(ビュー)を提供することになります。

ニアリアルタイム性を求める利用者が、このサポート外のデータ型をもつカラムを利用している場合、 その要件を満たすことができなくなってしまいます。

一方で、こうした取り込み設定はDB上にテーブルができてから後追いで入れることが多いため、 Datastreamでサポートしていないからとテーブルのデータ型の変更をプロダクト側に求めるのはとてもハードルが高く、 現実的ではありません。

現状よい解決方法は見つかっていないです。

プロダクト側の開発フローとの食い合いが悪い

主に開発環境での問題ですが、定期的に開発環境のCloudSQLをスナップショットから復元したり、 マイグレーションの大規模なリバート等を行うプロダクトがあります。 こうした操作が行われると、Datastreamは参照するべきレプリケーションスロットを見失い、エラー状態に陥ってしまいます。

開発環境なのでクリティカルな問題にはなりませんが、 Datastreamにて作ったデータを参照している他のプロダクトの開発に少なからず影響が出てしまいます。 そのため、数日中にはDatastreamの作り直しを行い、変更データの保存を再開したいです。

誰が開発環境のストリームを復旧するのかが困りポイントの1つです。

  1. プロダクトチームから連絡をもらい、データプラットフォームデスクで対応する
  2. ドキュメントを整備し、プロダクトチームに操作を覚えてもらう

現状は前者で対応していますが、どちらの方法もプロダクト側に本来の開発以外の手間を取らせてしまう点が課題です。

他にも、開発環境でありがちなシチュエーションだと思いますが、 PostgreSQLに手でクエリを流してテーブルを作り直すケースです。 テーブルが削除されると、パブリケーションの対象ではなくなってしまいます。 その場合、Datastreamとしてはエラーもなく、変更データの取り込みが行われなくなってしまいます。

なお、Datastreamがエラーで停止したのちも、PostgreSQL側のレプリケーションスロットが健在だった場合、 変更データが際限なくレプリケーションスロットに溜まっていきます。 一方で、Datastreamは作成時に、対象となるレプリケーションスロットに溜まっている変更データの最も古いデータからではなく、 そのタイミング以降のデータを取り込み対象とし、その間の変更データは破棄してしまうので、注意が必要です。 レプリケーションスロットに変更データがあまりに多く溜まっていると、最新の変更データにたどりつくまでに時間がかかるため、 レプリケーションスロットの再作成をしてからの方が早い場合もあります。

PostgreSQL for CloudSQLのメジャーバージョンアップ時にレプリケーションスロットが消える

PostgreSQLはメジャーバージョンアップ時に、レプリケーションスロットの情報を引き継がないため、 メジャーバージョンアップ作業後にレプリケーションスロットの再作成が必要です。 この場合、Datastreamの再作成は不要で、Google Cloudのコンソールから「ストリームの復元」をポチっとすることで復旧できるため、対応自体は簡単です。

とはいえ、プロダクトに何かしらの方法でレプリケーションスロットを作成するクエリを実行してもらわないといけないため、 緊張感のあるクエリの直接実行か、わざわざマイグレーションファイルを用意するといったと手間をプロダクト側に取らせることになります。

今後 : 任意の時刻を再現するビューの提供

任意の時刻のテーブルの状態を再現するビューの提供をして、以下の課題を解決できるのではないかと考えています。

  • latestテーブルは毎日洗い替えなため、集計するたびに内容が変化してしまい、過去の日付の特定の時刻のデータを利用して再集計を行なうことができない
  • バッチ的な取り込みが開始されて終了するまでの時間は不定なため、厳密に特定の時刻の状態をBigQuery上に再現できない

DB Pipelineで取り込んだテーブルデータのスナップショットsnapshotテーブルと、変更データ(cdcテーブルarchiveテーブル)を合わせることで、任意の時刻のCloudSQLの状態を再現できるはずです。 ただ、ベースとなるsnapshotテーブルは日付分割テーブルなため、スキーマは古いままですが、 cdcテーブルはDatastreamによってカラム追加が行われていきます。 こうしたテーブル間のスキーマのギャップを考慮しながら組み合わせる必要がありそうです。

おわりに

ニアリアルタイムなBigQueryへの同期、変更履歴の保存のために、CDCサービスのDatastreamを導入し、 運用時に遭遇した困りごとを紹介しました。

困っている点をいくつか紹介はしましたが、 サービス提供時のCloudSQLでは行わないような操作のタイミングでの困りごとが中心で、 総じて安定した稼働をしてくれており、採用時の狙い通り平常時の運用自体はとても楽です。 まだまだCDC対象テーブルが少ない状態なので、利用を拡大していき、 DB Pipelineによる取り込み頻度を抑えて全体のコストの最適化を行っていきたいと考えています。

enechainでは、巨大なマーケットを支えるデータ基盤を一緒に構築する仲間を募集しています。 興味のある方は、ぜひ以下のリンクからご応募ください! その他のポジションも含め、日本のエネルギー業界をテクノロジーで変革する仲間を募集しています。

tech.enechain.com

herp.careers