newmo 技術ブログ

技術で地域をカラフルに

ログのテイルベースサンプリングを OpenTelemetry Collector のカスタムプロセッサとして組み込む

はじめに

この記事は newmo Engineering Advent Calendar 2025 の 19 日目の記事です。本記事では opentelemetry-collector-contribtailsamplingprocessor に着想を得た「ログのためのテイルベースサンプリング」を、OpenTelemetry Collector のカスタムプロセッサとして実装した話を紹介します。

背景

Datadog における Ingestion コストの課題

newmo では、Observability プラットフォームとして Datadog を利用しています。Datadog はメトリクス、トレース、ログを統合的に扱える非常に強力なツールですが、サービスの成長とトラフィックの増加に伴いこれらテレメトリも増加し、結果としてコストが増大しがちです。特に正常なリクエストログや安定したエンドポイントが吐いているログなどデバッグ時の重要度が相対的に低いログまで全量取り込むことは、費用対効果の観点から見直しが必要でした。

OpenTelemetry Collector によるテレメトリ転送

現在、newmo のシステムではテレメトリ(メトリクス、トレース、ログ)の収集・転送に OpenTelemetry Collector (以下、otel collector) を利用しています。otel collector を介した Datadog へのログ転送アーキテクチャについては、以前 tjun さんが書いた記事で詳しく解説されています。(OpenTelemetry Collectorを使ったCloud Run to Datadogの実装パターン

otel collector ではオープンソースの opentelemetry-collecotor-contrib に実装されている多種多様なコンポーネント(Receiver, Processor, Exporter)をプラグインのように組み合わせて利用できます。このリポジトリには、トレースのサンプリングを行うための tailsamplingprocessor という強力なコンポーネントが既に存在します。これは例えば、「リクエストの処理完了まで待って全スパンを評価し、エラーステータスを持つスパンが含まれていればそのトレース全体を保存する」*1のような高度なサンプリング制御を可能にします。

しかし、この tailsamplingprocessor はトレース専用のコンポーネントとなっており、残念ながらログに対してそのまま適用することはできません。

私たちが実現したかったのは、単に個々のログにおけるログレベル(Error か Info か)だけでサンプリング決定することではなく、「同一トレース ID を持つ一連のロググループ」をひとまとまりとして扱い、全体の結果に基づいてロググループをサンプリング決定することです。

例えば、あるリクエスト処理中に一度でもエラーログが発生したのであれば、そのリクエストに関連する正常系のログ(Info など)も含めて全て Datadog に取り込まなければ、前後の文脈を追うことが難しくなります。逆に、完全に正常終了したリクエストであれば、その一連のログはまとめて drop の対象としても問題ない、と考えました。

既存のコンポーネントにこの要件を満たすものが存在しなかったため、私たちは tailsamplingprocessor のロジックに着想を得て、ログ用のテイルベースサンプリングを行うカスタムプロセッサを自前実装することにしました。

テイルベースサンプリング

コスト削減と Observability の両立を目指す上で、鍵となるのが「サンプリング方式」の選定です。ここでは ヘッドベースサンプリング (Head-based Sampling) と比較することで、テイルベースサンプリング (Tail-based Sampling) についてより深く理解していきます。

ヘッドベースサンプリングとの違い

ヘッドベースサンプリングは、単一のテレメトリをアプリケーションから送信する時もしくは otel collector に到達した時に、サンプリング決定を行います。例えば「10% の確率で保存する」といったランダムな判断や、特定のユーザーIDに基づいてサンプリングするかどうかを決定します。決定ロジックが単純で、オーバーヘッドが小さく実装難易度も高くないので導入しやすい一方で、サンプリングされたテレメトリはリクエスト内で欠損があるのでトレース全体の文脈を理解するのが難しい場合があります。

対してテイルベースサンプリングは、リクエストの処理が完了するまでサンプリング決定を保留します。一連の処理が終わった段階で、「エラーが含まれていたか?」「レイテンシが高かったか?」「特定の (key, value) を含んでいたか?」を評価し、同一 Trace ID をもつロググループに対してサンプリング決定を行います。テイルベースサンプリングを採用することで S/N 比*2が高くコスト効率的な監視基盤の実装を実現できます。

懸念点

非常に強力なテイルベースサンプリングですが、導入にはアーキテクチャ上の考慮事項が存在します。

1. リソース消費

サンプリング決定を事後的に行うためには、あるリクエストに関連するすべてのログやスパンを otel collector 上のメモリにバッファリングしておく必要があります。 トラフィック量やトレースの保持期間設定によっては、メモリ消費量が顕著に増加するため、リソース設計が必要です。例えば tailsamplingprocessor において、最初にとある Trace ID を持つスパンが到達してからサンプリング決定までバッファリングする時間を decision_wait で設定できますが、デフォルトの 30s ではかなりメモリを圧迫することになります。

実際、デフォルト値のままデプロイした結果 high memory usage というメッセージと共にトレースをドロップしてしまう結果となりました。

(図1: テイルベースサンプリングを適用後 CPU / メモリの使用量が一時的に上がっている様子)

2. スケーリングの難しさ

otel collector を複数台構成(Replica > 1)で運用している場合にもさらに複雑な課題が発生します。同一 Trace ID のログが別々のインスタンスに振り分けられてしまうと、各インスタンスはロググループの全体像を把握できず、正しいサンプリング決定ができません。

これを解決するには、前段に loadbalancingexporter を配置し、Trace ID に基づいたコンシステントハッシュを用いて同じ Trace ID のテレメトリが必ず同じ インスタンスに届くような構成(ステートフルなルーティング)を組む必要があります。

ログのテイルベースサンプリングとは?

ここまでトレースを前提としたような説明をしてきましたが、ログにおけるテイルベースサンプリングも、概念は全く同じです。

構造化ログの中に含まれる Trace ID をキーとして、一連のログを「ひとつのリクエスト処理」としてグルーピングします。あとはトレースの場合と同様に、バッファリングしグループ全体でサンプリング決定し、グループごと保存するか破棄するかを決定するだけです。

OpenTelemetry Collector のカスタムプロセッサ

実装の詳細に入る前に、otel collector の拡張性について軽く紹介します。

otel collector は、データを受け取る Receiver、加工する Processor、外部へ送信する Exporter という3つの主要コンポーネントによるパイプラインで構成されています。

基本的なユースケースでは opentelemetry/opentelemetry-collectoropentelemetry/opentelemetry-collector-contrib で提供されているプロセッサ(batch, memory_limiter, attributes, ...)を組み合わせるだけで事足りますが、今回のように「カスタマイズした制御を行いたい」「特殊なビジネスロジックを挟みたい」といった要件がある場合、Go 言語で独自の カスタムプロセッサ を実装することが可能です。

インターフェースの実装とビルド

カスタムプロセッサを作るといっても、ゼロから全てを作るわけではありません。基本的には、otel collector が定義しているインターフェースを満たす Go の構造体を実装すれば良いです。

また、作成したカスタムプロセッサを組み込む際は、OpenTelemetry Collector Builder (OCB) というツールで手軽に行えます。newmo での導入事例はこちらの記事で紹介されているのでご覧ください: OpenTelemetry Collectorを使ったCloud Run to Datadogの実装パターン

今回は、この仕組みを利用して「ログ用テイルベースサンプリング」のロジックを Go で実装し、独自の Collector バイナリとしてビルドするアプローチを採りました。

実装を少しだけ見てみる

では、具体的にどのようなコードで動いているのか、実装の核心部分を本家の tailsamplingprocessor (Trace用) を例に見ていきます。

処理の流れは、「受信」「イベントループ」「バッファリング(兼・遅延処理)」「サンプリング決定」 の4ステップで構成されています。

1. 受信とグルーピング (ConsumeTraces)

まず、プロセッサのエントリーポイントとなる ConsumeTraces です。ここでは、送られてきたスパンの集合を受け取り、処理しやすい単位に整理します。まず、満たすべき実装については以下のとおりです。

ここでは、バラバラに来るスパンを TraceID 単位に束ね直して、裏で動いている goroutine へ渡すことだけを行っています。

// ConsumeTraces is required by the processor.Traces interface.
func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
    for _, rss := range td.ResourceSpans().All() {
        // TraceID ごとにスパンをグルーピングする
        // (ログの場合、ここで plog.Logs を TraceID ごとに仕分ける)
        idToSpansAndScope := groupSpansByTraceKey(rss)

        // ...(中略)...

        // グルーピングしたデータをバッチとしてまとめ、処理用チャネル(workChan)に流す
        // ここで即座に処理せずチャネルに流すことで、受信処理をブロックしないようにしている
        if len(batch) > 0 {
            tsp.workChan <- batch
        }
    }
    return nil
}

2. イベントループと時間管理 (iter / loop)

プロセッサの心臓部です。ここでは「データの到着」と「時間の経過 (Tick)」の両方を待ち受けています。Tick の間隔はデフォルトで1秒です。つまり、データの処理とは非同期に、毎秒「判定待ち時間 (decision_wait) を過ぎたデータはないか?」という監視が行われています。

func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan <-chan []traceBatch) bool {
    select {
    // A. 新しいデータが届いた場合
    case batch, ok := <-workChan:
        // ...(中略)...
        for _, trace := range batch {
            // メモリ上のマップに追加する処理
            tsp.processTrace(trace.id, trace.rss, trace.spanCount)
        }

    // B. 定期的な tick が来た場合
    case <-tickChan:
        // 後述の処理を行う
        tsp.samplingPolicyOnTick()
    
    // ...(中略)...
    }
    return true
}

この iter メソッドを loop メソッドが呼び出し続け、loop はプロセッサの初期化処理の中で、別の goroutine として呼び出されています。

func (tsp *tailSamplingSpanProcessor) loop() {
    ticker := time.NewTicker(tsp.tickerFrequency)
    defer ticker.Stop()
    defer close(tsp.doneChan)
    for tsp.iter(ticker.C, tsp.workChan) {
    }
}

3. バッファリングと遅延データの処理 (processTrace)

processTrace は基本的にはデータをメモリに溜める場所ですが、「すでに判定が終わったはずのデータが遅れて届いた場合」 の処理もここで行います。

func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64) {
        // ...(中略)...
    // 1. メモリ上の管理データを取得(なければ新規作成)
    actualData, ok := tsp.idToTrace[id]
    if !ok {
        // 新規 TraceID ならバッファを作成し、「判定待ちリスト」に登録
        actualData = &traceData{ /* ... */ }
        tsp.idToTrace[id] = actualData
        tsp.decisionBatcher.AddToCurrentBatch(id)
    }
    
    // ...(中略)...
    // 2. 「この TraceID をすでに判定済みか」を確認
    finalDecision := actualData.finalDecision

    // パターンA: まだ判定していない(普通のパターン)
    if finalDecision == samplingpolicy.Unspecified {
        // バッファに追記して終了
        // あとは Tick を待つ
        appendToTraces(actualData.ReceivedBatches, rss)
        return
    }

    // パターンB: すでに判定済み(Late Arrival)
    // Tick を待たずに、過去の決定に従ってすぐ処理する
    switch finalDecision {
    case samplingpolicy.Sampled:
        // 「サンプルする」と決まっていた ID なので、即座に通す
        tsp.forwardSpans(tsp.ctx, traceTd)
    case samplingpolicy.NotSampled:
        // 「捨てる」と決まっていた ID なので、即座に破棄
        tsp.releaseNotSampledTrace(id)
    }
    // ...(中略)...
}

このように、processTrace が受付と同時に「遅刻データの即時処理」も担うことで、ネットワーク遅延などで順序が前後したり遅延してきたスパンに対してもグループの整合性を保ってサンプリング処理できます。

4. メインの判定処理 (samplingPolicyOnTick)

最後に、Tick によって呼び出されるメインの判定ロジックです。 設定された decision_wait(例: 30秒)が経過したデータに対して、まとめてサンプリング決定を下します。

func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool {
    // 判定待ちリストから、時間が経過した TraceID 群を取得
    batch, hasMore := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch()

    for _, id := range batch {
        trace, ok := tsp.idToTrace[id]
        // ...(中略)...

        // ここでポリシー(エラーがあるか?等)を評価し、サンプリング決定を下す
        decision := tsp.makeDecision(id, &trace.TraceData, metrics)

        // 決定を記録(遅延して到着する span 対応のため)
        trace.finalDecision = decision

        // 判定結果に応じて処理を分岐
        if decision == samplingpolicy.Sampled {
            // Sampled なら次のプロセッサへデータを渡しながらメモリから破棄して終了
            tsp.releaseSampledTrace(ctx, id, allSpans)
        } else {
            // NotSampled ならメモリから破棄して終了
            tsp.releaseNotSampledTrace(id)
        }
    }
    return hasMore
}

コードを追うと、非常に簡潔ながら堅牢な実装になっていることがわかります。

  1. Ingest: データを TraceID というキーで整理する
  2. Tick: 1秒ごとに「時間切れのデータ」がないか監視する
  3. Process:
    • 通常ケース: メモリに溜めて判定を待つ
    • 遅延ケース: すでに TraceID が判定済みの ID なら、待たずに即時処理する
  4. Evaluate: 時間が来たら中身(エラーログがあるか等)を見て、保存するか捨てるかを決める

ログのテイルベースサンプリングを作成するにあたっても、この ptrace.* の部分を plog.* に置き換え、makeDecision の中の評価ロジックを「スパンのステータス」から「ログの severity」などに変えるだけで、このロジックをそのまま活用できました。

OpenTelemetry Collector に組み込む

ここまで実装を見てきた tailsamplingprocessor は以下のように otel collector に組み込むことができます。

processors:
  tail_sampling: # <- ここからポリシーなどを設定
    decision_wait: 5s
    sample_on_first_match: true
    policies:
      # エラーステータスを持つトレースは 100% サンプリング
      - name: errors-100pct
        type: status_code
        status_code:
          status_codes: [ERROR]
        # OTTL 条件式を使用して特定の body 値を持つログを 100% サンプリング
      - name: subscriber-100pct
        type: ottl_condition
        ottl_condition:
          error_mode: ignore
          span:
            - 'attributes["service.name"] == "critical-service"'
      # 高いレイテンシを持つトレースは 100% サンプリング
      - name: latency-100pct
        type: latency
        latency:
          threshold_ms: 1000
      # それ以外は 10% の確率でサンプリング
      - name: default-probabilistic
        type: probabilistic
        probabilistic:
          sampling_percentage: 100
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [tail_sampling, batch] # <- ここに追加
      exporters: [datadog/exporter]

そして、今回の主題であるカスタムプロセッサ (logtailsamplingprocessor) も tailsamplingprocessor と同じ設定項目を持つように実装すれば、以下のように otel collector に組み込むことができます。

processors:
  logtail_sampling: # <- ここからポリシーなどを設定
   decision_wait: 5s
   sample_on_first_match: true
   policies:
       # エラーレベル以上のログを含むロググループは 100% サンプリング
     - name: errors-100pct
       type: severity
       severity:
         min_severity: ERROR
    # OTTL 条件式を使用して特定の body 値を持つログを 100% サンプリング
    - name: service-name-policy
      type: ottl_condition
      ottl_condition:
        error_mode: ignore
        log:
          - 'body["service.name"] == "critical-service"'
     # それ以外は 10% の確率でサンプリング
     - name: default-probabilistic
       type: probabilistic
       probabilistic:
         sampling_percentage: 10
service:
  pipelines:
    logs:
      receivers: [googlecloudpubsub]
      processors: [logtail_sampling] # <- ここに追加
      exporters: [datadog/exporter]

logtailsamplingprocessor におけるサンプリングポリシーについては以下を参考に実装しました。

github.com

github.com

まとめ

本記事では、OpenTelemetry Collector の tailsamplingprocessor に着想を得て、ログ用のテイルベースサンプリングを行うカスタムプロセッサを実装・導入した事例を紹介しました。我々の環境で導入した結果、実際のサンプリング結果は設定したサンプリングレートよりも少し大きいです。理由は、Trace ID が付与されていないログを通すようにしていたり、トレースによって保持するログの数がバラバラだからです。ですが導入により、重要なデバッグ情報は一切損なうことなく、肥大化しがちだったコストを適切に削減できています。

(図2: テイルベースサンプリングを適用後該当 GraphQL Operation のログの数が減っている様子)

余談ですが、私たちが利用している opentelemetry-collector-contrib は、執筆時点でもまだ メジャーバージョンが 0 です(v0.131.0)。そのため、マイナーバージョンアップであっても破壊的変更が含まれることが多々あります。非常に強力なエコシステムですが、バージョンアップの際は Changelog を熟読し、慎重に行うことをおすすめします(自戒を込めて)。

書いた人: tobi

*1:より正確には、「収集したスパンを一定時間バッファリングし、そのスパン全体でサンプリング決定」しています。

*2:有効で役にたつ情報 (signal) と役に立たない情報 (noise) の比率。