Process Managerパターンで複雑な業務フローを見通しよく実装する

ogp

はじめに

こんにちは、enechainでエンジニアをしている青戸です。

イベント駆動のシステムで、処理の流れが複雑化して「ピタゴラスイッチ」状態になってしまったことはありませんか?

あるイベントが発生して、それをトリガーに別の処理が動いて、さらにその結果を受けて次の処理が......と連鎖していくうちに、全体のフローを把握するのが困難になる。どこで何が処理されるのかを追うのに、複数のハンドラを行ったり来たりしながら読み解かなければならない。そんな経験をしたことがある方も多いのではないでしょうか。

我々が開発しているプロダクトにおいても、業務プロセスが複雑化するにつれてまさにこの問題に直面しました。本記事では、Process Managerパターンを導入してこの問題を解決した経験を、実装例とともに紹介します。

Process Managerパターン導入以前のアーキテクチャ

CQRS+Event Sourcing の採用

本記事で題材とするのは我々が開発している、エネルギー取引のドメインを扱うプロダクトです。 公平な取引が行われていたことを証明するため官公庁への取引ログ提出を求められる場合があり、任意の時点でのシステム状態を復元できる必要があるため、 CQRS+Event Sourcingパターンを採用してシステムを構築しています。

イベントをトリガーとした業務プロセスの自動化

ある業務の完了をトリガーに、別の業務を実行するというパターンは非常に多く見られます。我々のプロダクトでも「注文が作成されたら履歴を作成する」「約定したら通知を送る」といった後続業務が多数存在しています。Event Sourcingではイベントが一級市民であるため、自然とイベントをトリガーとして非同期で後続処理を実行するパターンを採用することとなりました。 一連の業務処理を一つのトランザクションで実行するのではなく、イベントを介して非同期に連携させることによって、異なる業務同士を疎結合に実装できたり、ユーザーリクエストに対するレスポンス時間の短縮にもつながるなど、イベント駆動の利点を得られています。

直面した課題: 単純なイベントハンドラの限界

イベント駆動な業務プロセスの自動化を実現するため、最初はイベントに対するハンドラを個別に定義するシンプルなアプローチを取っていました。 このパターンは「イベントを受けて1つの処理を行う」という単純なケースには適しており、我々のプロダクトでも履歴作成や通知送信など、多くの後続処理はこのパターンで実装しています。

しかし、業務プロセスが複雑になると、このパターンでは対応しきれなくなりました。 例えば下記のような業務の流れを考えてみましょう。①と②という2つの業務があり、①の業務はイベントAから始まり、処理1 → イベントB → 処理2-1の順に進みます。一方、②の業務は業務の開始位置が異なり、イベントBから始まり処理2-2と進みます。

複雑な業務プロセスの例

どちらの場合も途中でイベントBを受け取りますが、特定のイベントに対応して後続の処理を実行するだけの単純なイベントハンドラでは、イベントBを受けたときにこれがどの業務プロセスに属するイベントなのかを判別できないため、次にどの処理に進んでいいのかが判断できません。この問題はイベントハンドラが前のステップのコンテキストを持たないことによって生じています。

対応策として、業務プロセスに関するコンテキスト情報をイベントに含める方法が考えられます。しかし、これにはいくつかの課題があります。

まず、業務プロセスの制御に関する情報までイベントに含めてしまうと、イベントが本来持つ「業務で起こった事実を記録する」という責務から逸脱してしまいます。また、異なる業務がお互いの存在を意識しなければならなくなり、疎結合性が損なわれます。業務プロセスが増えるたびにイベント定義 が肥大化し、保守が困難になるでしょう。さらに、外部システムを経由する業務プロセスでは、そもそも自システムのコンテキスト情報をすべてイベントに含めてもらうこと自体が難しい場合もあります。 加えて、一連の業務プロセスを独立したイベントハンドラの組み合わせで実装すると、全体のフローを把握しづらくなります。どのイベントがどこで処理されるのかを追うために複数のハンドラを行き来しながら読み解く必要があり、コー ドの可読性と保守性が低下します。

Process Managerパターンによる解決

我々のプロダクトでは、Process Managerパターンを導入することで上記の課題の解決を図りました。

Process Managerパターンとは

Process Managerは、イベント駆動の処理フローの中に複数の集約間のメッセージ交換を仲介・調整する役割を追加するパターンです。 業務フローにおける現在のステップやコンテキスト情報を保持し、受け取ったイベントに基づいて次に実行すべきアクション(コマンド)を決定して発行します。 Process Managerは下記のように動作し、複数のステップからなる業務の流れを管理するステートマシンのように振る舞います。

  1. イベントを受け取る
  2. 現在の状態を確認する
  3. 次に行うべきアクション(コマンド)を決定して送信する
  4. 状態を更新する

Process Managerの概要

実装時に意識したいのは、Process Managerはイベントを受け取り、コマンドを発行するという責務に徹することです。業務ロジック自体はコマンドハンドラ側に実装し、Process Managerはあくまで「どのコマンドを発行するか」を決めるルーティング役に専念します。こうすることで、コマンドハンドラとProcess Managerにロジックが分散することを防ぎ、コードの見通しが良くなります。

単純なイベントハンドラとProcess Managerの違いを表にまとめると下記のとおりです。

イベントハンドラ Process Manager
状態 持たない 持つ
判断基準 イベントの内容のみ イベント + 現在の状態
適する用途 1イベント→1処理 複数ステップの業務プロセス

実装例: 複数市場への注文掲載

ここからは、実際にどのようにProcess Managerを実装したのかを具体的なコードを示しながら紹介します。

要件

エネルギー取引を扱うシステムにおいて注文を作成するプロセスを考えます。 自システムは業務効率化のために注文をクローズドに管理するシステムですが、それとは別に外部公開されている取引所システムが存在し、ユーザーはそちらにも注文を掲載したい場合があります。2つの市場を区別するために、以下のように呼ぶこととします。

  • InternalMarket: 自システムが管理する市場
  • ExternalExchange: 外部システムで管理される外部取引所

ユーザーは自システムが管理する市場にのみ注文を掲載することもできますし、外部システムの取引所、またはその両方に掲載することもできます。 注文作成の流れは、例えば「両市場」を選択した場合、以下のようになります。

  1. 自システムで注文を作成
  2. 外部システムに注文作成をリクエスト
  3. 外部システムからの作成完了通知を待つ
  4. 外部取引所への掲載を反映
  5. 自市場への掲載を反映
  6. 作成完了を通知
  7. ユーザーの画面に注文が反映される

どちらか片方の市場のみを選択した場合は、上記のフローから不要なステップが省略されます。 注文作成プロセスは上記のようにユーザー起点で始まる場合もあれば、外部システムの取引所で注文が作成されたことをきっかけに自システムに注文を取り込む場合もあります。

では、この要件をProcess Managerを使ってどのように実装できるのかを見ていきましょう(コードはGo言語で記述しています)。

ProcessManagerインターフェース

Process Managerの核となるインターフェースは下記のとおりです。

type ProcessManager interface {
    CanHandle(event Event) bool
    Execute(ctx context.Context, event Event, exec ExecFunc) error
}

type ExecFunc func(ctx context.Context, cmd Command) error

各メソッドの役割は下記のとおりです。

  • CanHandle: このProcess Managerが処理すべきイベントかを判定
  • Execute: イベントを受け取り、状態に応じてコマンドを発行
  • ExecFunc: コマンドバスへの委譲関数。これを通じてコマンドを実行し他の集約を操作する

呼び出し元(Worker)の疑似コードは下記のようになります。Workerは複数のProcessManagerを保持しており、イベントを受け取ると対応するProcessManagerを探して実行します。commandBusはコマンドをコマンドハンドラにルーティングするコンポーネントです。

type Worker struct {
    processManagers []ProcessManager
    commandBus      CommandBus
}

func (w *Worker) HandleEvent(ctx context.Context, event Event) error {
    for _, pm := range w.processManagers {
        if pm.CanHandle(event) {
            return pm.Execute(ctx, event, func(ctx context.Context, cmd Command) error {
                return w.commandBus.Execute(ctx, cmd)
            })
        }
    }
    return nil
}

状態遷移の設計

Process Managerの設計では、まずは業務プロセスを構成するステップとその状態遷移を整理することから始めます。 業務のステップとしては下記があります。

  • NotStarted: プロセス未開始
  • AwaitingExternalCreation: 外部システムでの作成待ち
  • AwaitingExternalListing: 外部取引所へ掲載されたことをマークする処理待ち
  • AwaitingInternalListing: 自市場への掲載されたことをマークする処理待ち
  • Completed: プロセス完了

また、イベントとしては下記があります。

  • OrderCreated: 自システムで注文が作成された
  • ExternalOrderCreated: 外部システムで注文が作成された
  • ExternalOrderRejected: 外部システムでの注文作成が拒否された
  • ListedOnExternal: 外部取引所への掲載が完了した
  • ListedOnInternal: 自市場への掲載が完了した

ステップとイベントを受けたときの状態遷移を図示すると下記のようになります(丸がステップ、矢印がイベントによる遷移を表しています)。

状態遷移図

色がついているイベントに注目いただくと、もともとのリクエストの内容によって同じステップ・イベントでも次のステップが変わっていたり(青色)、同じイベントが異なるステップで発生していたり(赤色)することがわかります。 業務のコンテキストや現在のステップに応じて処理を分岐させる必要があるため、単純なイベントハンドラの組み合わせでは処理の流れを把握することはかなり難しくなることが想像いただけるのではないでしょうか。

のちほど具体的なコード例で示しますが、Process Managerのメソッドとして各イベントのハンドラを実装していきますので、状態遷移を表形式でまとめると実装時に役立ちました。 下記の状態遷移表では、列がイベント、行が現在のステップを、セルが状態に基づく次のステップを表しています(分岐がある場合は2行に分けて記載しています)。

現在のステップ OrderCreated ExternalOrderCreated ExternalOrderRejected ListedOnExternal ListedOnInternal
NotStarted (両市場) → AwaitingExternalCreation → AwaitingExternalListing - - -
(自市場のみ) → AwaitingInternalListing
AwaitingExternalCreation - → AwaitingExternalListing → Completed (失敗通知) - -
AwaitingExternalListing - - - (両市場) → AwaitingInternalListing -
(外部取引所のみ) → Completed
AwaitingInternalListing - - - - → Completed (成功通知)

この状態遷移をコードで表現します。まず、業務プロセスのステップを定義します。

var CreateSteps = struct {
    NotStarted               CreateStep // 未開始
    AwaitingExternalCreation CreateStep // 外部システムでの作成待ち
    AwaitingExternalListing  CreateStep // 外部取引所への掲載待ち
    AwaitingInternalListing  CreateStep // 自市場への掲載待ち
    Completed                CreateStep // 完了
}{
    NotStarted:               "not-started",
    AwaitingExternalCreation: "awaiting-external-creation",
    AwaitingExternalListing:  "awaiting-external-listing",
    AwaitingInternalListing:  "awaiting-internal-listing",
    Completed:                "completed",
}

次に、業務プロセスの状態を保持する構造体を定義します。

type CreateState struct {
    ProcessID  uuid.UUID     // 業務プロセスを一意に識別するID
    Step       CreateStep    // 現在のステップ
    OrderID    OrderID       // 対象の注文ID
    MarketDest Market        // 掲載先(InternalMarket / ExternalExchange / Both)
}

// ステップを遷移させるヘルパー
func (s *CreateState) To(step CreateStep) *CreateState {
    return &CreateState{
        ProcessID:  s.ProcessID,
        Step:       step,
        OrderID:    s.OrderID,
        MarketDest: s.MarketDest,
    }
}

ProcessIDがポイントです。業務プロセスの開始時(例えば注文作成時)にProcessIDを生成し、後続のイベントにはこのProcessIDのみを含めます。イベント定義にプロセスのコンテキスト情報を含める代わりに、ProcessIDを使って複数のイベントにまたがるプロセスを識別し、必要なコンテキスト(ここでは元々の注文の掲載先)はStateから取得します。

外部システムを経由したコンテキストの受け渡しについては、今回のケースにおいては外部システムがリクエストIDを受け取り、結果通知時に同じIDを返す仕組みであったため、ProcessIDをリクエストIDに含めることによって実現しました。自システムに必要なコンテキスト情報をすべて外部システムに引き回してもらうのは現実的でない場合が多いと思いますが、リクエストIDのような一般的な仕組みを活用することで実現できるところもこのパターンの利点です。

Executeメソッドの実装

Executeメソッドでは、イベントの種類に応じたハンドラを呼び出し、次の状態を保存します。

func (m *createOrderProcessManager) Execute(
    ctx context.Context,
    event Event,
    exec ExecFunc,
) error {
    // イベント種別に応じたハンドラを呼び出し
    nextState, err := func() (*CreateState, error) {
        switch e := event.(type) {
        case *OrderCreated:
            return m.handleOrderCreated(ctx, e, exec)
        case *ExternalOrderCreated:
            return m.handleExternalOrderCreated(ctx, e, exec)
        case *ExternalOrderRejected:
            return m.handleExternalOrderRejected(ctx, e, exec)
        case *ListedOnExternal:
            return m.handleListedOnExternal(ctx, e, exec)
        case *ListedOnInternal:
            return m.handleListedOnInternal(ctx, e, exec)
        default:
            return nil, nil
        }
    }()
    if err != nil {
        return err
    }

    // 次の状態を永続化
    if nextState != nil {
        return m.state.Store(ctx, nextState)
    }
    return nil
}

次のセクションで実装例を紹介しますが、イベントハンドラは内部的にコマンドハンドラを呼び出しコマンドを実行します。 このとき、新たにイベントが発生するため、即座にコミットしてしまうと次の処理が現在の処理の完了を待たずに始まってしまい、古い状態を参照して処理されてしまう可能性があります。 これを防止するためにコマンドの実行と状態の保存を同一トランザクション内で行うようにしています。

イベントハンドラの実装

核心となる部分です。イベントハンドラでは現在のProcess Managerの状態に応じて処理を分岐させます。 下記はExternalOrderCreatedイベントを処理するハンドラの例です。

現在のステップによって、注文作成が自システム起点なのか外部システムなのかを判別し、適切なコマンドを発行しています。

func (m *createOrderProcessManager) handleExternalOrderCreated(
    ctx context.Context,
    event *ExternalOrderCreated,
    exec ExecFunc,
) (*CreateState, error) {
    // ProcessID から現在の状態を取得
    prev, err := m.state.FindByProcessID(ctx, event.ProcessID)
    if err != nil {
        return nil, err
    }

    switch {
    case prev.Step == CreateSteps.NotStarted: // 外部システムで発生した注文のため、自システムに取り込む
        cmd := NewAcceptExternalOrderCreatedCommand(prev.ProcessID, event)
        if err := exec(ctx, cmd); err != nil {
            return prev, err
        }
        return prev.To(CreateSteps.AwaitingExternalListing), nil
    case prev.Step == CreateSteps.AwaitingExternalCreation: // 自システム起点での注文作成完了通知を受け取った場合、既存の注文に反映させる
        cmd := NewListOnExternalCommand(prev.OrderID, prev.ProcessID)
        if err := exec(ctx, cmd); err != nil {
            return prev, err
        }
        return prev.To(CreateSteps.AwaitingExternalListing), nil
    default:
        return prev, fmt.Errorf("unexpected step: %s", prev.Step)
    }
}

同様に、ListedOnExternalイベントを処理するハンドラも見てみましょう。 もともとの注文作成時に指定された掲載先に応じて、次のステップを分岐させています。

func (m *createOrderProcessManager) handleListedOnExternal(
    ctx context.Context,
    event *ListedOnExternal,
    exec ExecFunc,
) (*CreateState, error) {
    // ProcessID から現在の状態を取得
    prev, err := m.state.FindByProcessID(ctx, event.ProcessID)
    if err != nil {
        return nil, err
    }

    switch {
    case prev.Step == CreateSteps.AwaitingExternalListing:
        switch prev.MarketDest { // 注文作成時の掲載先に応じて分岐
        case Markets.Both: // 両市場の場合は自市場にも掲載
            cmd := NewListOnInternalCommand(prev.OrderID, prev.ProcessID)
            if err := exec(ctx, cmd); err != nil {
                return prev, err
            }
            return prev.To(CreateSteps.AwaitingInternalListing), nil
        case Markets.External: // 外部取引所のみの場合は完了通知を送信
            cmd := NewNotifyCreationSucceededCommand(prev.OrderID, prev.ProcessID)
            if err := exec(ctx, cmd); err != nil {
                return prev, err
            }
            return prev.To(CreateSteps.Completed), nil
        default:
            return prev, nil
        }
    default:
        return prev, fmt.Errorf("unexpected step: %s", prev.Step)
    }
}

以上のようにして、複雑な業務プロセスを、イベント駆動の利点を損なうことなく見通しよく実装することができました。

まとめ

本記事では、複雑なイベント駆動の業務プロセスをProcess Managerパターンで実装した事例を紹介しました。

Process Managerが解決する課題

  • 単純なイベントハンドラでは、複数ステップにまたがる業務プロセスで「どの文脈のイベントか」を判別できない
  • 処理が複数のハンドラに分散し、全体のフローを把握するのが困難になる
  • Process Manager に業務プロセスに関する知識を集約し、現在のステップやコンテキストを状態として保持することで、イベント駆動の利点を損なうことなく複雑な業務プロセスを見通しよく実装できる

設計するときのポイント

  • まず、業務プロセスをステップと状態遷移で整理すると実装しやすい
  • 業務ロジックはコマンドハンドラ側に実装し、Process Managerはルーティングに徹する
  • Process Managerは強力なパターンだが、単純なケースには過剰な抽象化になり得るため、使いどころを見極める必要がある(我々のシステムでも、履歴作成や通知送信といった単純な後続処理はシンプルなイベントハンドラで実装しています)

イベント駆動のシステムで業務プロセスが複雑化してきたときは、Process Managerパターンの導入を検討してみてください。


enechainでは、日本のエネルギーの未来を一緒に作っていく仲間を募集しています。興味を持っていただけた方は下記のリンクからぜひご応募ください。

herp.careers herp.careers