DIGGLE開発者ブログ

予実管理クラウドDIGGLEのプロダクトチームが、技術や製品開発について発信します

PapaParseで始めるストリーム処理

はじめに

CSVって便利ですよね

ある程度の量のデータをユーザも扱いやすい形で一度に入出力しようとする場合、実質CSVが選択肢の筆頭になるかと思います。

一方で、データが巨大になりやすく、パフォーマンスの課題が出やすい箇所でもあります。

今回は、DIGGLEでCSVを扱っている処理で実際に起きているパフォーマンスの課題に対してストリーム処理を使って改善した例を紹介します。

(この記事では、より汎用的なワードとしての「ストリーム処理」とNode.js/PapaParseのインタフェースとしての「Stream」を使い分けます)

パフォーマンス上の悩み

DIGGLEでは一部のサブシステムがNode.jsで動いており、そこでもCSVを扱っています。

CSVの変換にはPapaParseというJavaScriptでは一般的なライブラリを使用しています。 www.papaparse.com

ごく簡単に説明すると、入力されたCSVをパースし、変換や集計などの処理を行った結果をCSVに変換して出力するというものです。

ただ、メモリ使用効率が悪く、巨大なCSVが指定された場合にメモリが足りなくなるため、CSVファイルを分割してアップロードしてもらう運用をするケースが増えてきました。

ストリーム処理について

巨大なデータを扱う際の手法としてストリーム処理は一般的です。(ストリーム処理の詳細については他にも優良な記事があるため今回は割愛します)

ストリーム処理を使用する目的

個人的にはストリーム処理といった時に目的は大きく2通りに分けられると思っています。 実際は各言語やライブラリのAPIとしては両方のパターンを扱える場合も多いと思うので、あまり意識することはないかもしれません。

目的 用途
リアルタイム処理 都度届くデータに対して一つのリソースとして扱いつつ、逐次処理も可能にする Kafka Streams
分割実行 巨大なデータに対して必要な分だけ都度メモリに乗せて処理することでメモリ使用量を抑える ファイル読み込み/書き込み

今回は、後者の目的に絞ります。

Node.js / PapaParseにおけるStream

Node.jsではこのようにAPIが定義されています Stream | Node.js v22.12.0 Documentation

PapaParseではparse時にstepオプションを指定することでストリーム処理をすることができます Documentation - Papa Parse

注:Nodeで使用する場合とブラウザで使用する場合で若干の違いがあります(「PaPa Parse for Node」の項目を参照) GitHub - mholt/PapaParse: Fast and powerful CSV (delimited text) parser that gracefully handles large files and malformed input

実例

今回改善した処理

CSVファイルのフォーマットは千差万別です。また他システムから出力されるファイルを取り込むケースでは、DIGGLEでは不要な行や列が含まれることも多くあります。

今回は、最初にデータ(行、列)の絞り込みを行うことで扱うデータ量を減らし、より巨大な入力に耐えられることを目指しました。 この絞り込みを通常のparse処理で行うとメモリ消費は入力データに応じたサイズになりますが、ストリーム処理することで絞り込み後のデータに応じたサイズにまで軽減できます。

なお、理想的には出力もStream化することで最終的な効率化が実現できますが、後段の処理でデータをメモリ上に保持する必要があったため、この絞り込みについては入力:String型、出力:配列型という形に落ち着きました。

PapaParseのドキュメントにも下記のような記載があり、いつかは完全Stream化したいです

How do I get all the results together after streaming?

You don't. Unless you assemble it manually. And really, don't do that... it defeats the purpose of using a stream. Just take the parts you need as they come through.

https://www.papaparse.com/faq#streaming

実際のコードは下記のようになりました(一部、公開用に変更しています)

import { Readable } from 'node:stream';

function convertPapaData2Result(data) {
  // TBD:  filter、および、列の除去
  return result;
}

class PapaStream {
  async execute(input) {
    // 結果保持用
    const results = [];

    const stepRun = papaResult => {
      const result = convertPapaData2Result(papaResult.data);
      if (result) {
        results.push(result);
      }
    };

    const stream = Readable.from(input);

    await this.parseStream(stream, stepRun);

    return results;
  }

  async parseStream(stream, stepRun) {
    return new Promise((resolve, reject) => {
      const parseConfig = {
        step(data) {
          if (data.errors.length > 0) {
            throw new RuntimeFailure('parse error');
          }
          stepRun(data);
        },
        error(e) {
          stream.destroy();
          reject(e);
        },
        complete(r, a) {
          a.destroy();
          resolve();
        },
        header: true,
        skipEmptyLines: true
      };
      Papa.parse(stream, parseConfig);
    });
  }
  • inputはString型でReadable.fromを使用してStreamを生成しています
  • resultsは出力となる配列、要素はCSVの行データ+内部用の情報です
  • convertPapaData2Resultで行う加工・フィルタは実際はユーザ側でカスタマイズも可能な仕組みで複雑なため、省略しています
  • PapaParse.parse
  • completecallbackが呼ばれた際に実行を終了させるためにPromiseを使用しています
    • それにより呼び出し元を含めてasync-awaitに変更しました

おまけ (PapaParseのStreamだからできたパフォーマンス改善)

今回扱っているサブシステムの処理の一部に 「元のCSVの該当する行の元データがCSVフォーマットで欲しい」というのがあります

例えば下記のようなCSVがあった時に

A,B,C,D
101,あいうえお,100,円
102,"ABC
DE",1.99,ドル
103,かきくけこ,200,円

2つ目は 102,"ABC\nDE",1.99,ドルだ、という情報が欲しいイメージです

従来は、一度parseした後に1行ずつunparseを行うことで実現していましたが、実際これはCPUの観点からもメモリの観点からも効率が悪かったため、今回Streamを使い下記のような実装を行いました(一部、公開用に変更・省略しています)

async function originalRows(data) {
  const stream = Readable.from(data);

  const originals = [];
  // streamで取得すると現在のcursor位置を取得できるので、その位置で区切った元の文字を取得する
  let cursor = 0;
  let idx = 0;

  return new Promise((resolve, reject) => {
    // 完全に元データ上の行番号を取り出すため、skipEmptyLine: false状態でparseした結果でindexを得る
    const parseConfig = {
      header: false,
      skipEmptyLines: false,
      step(line) {
        let original = data.slice(cursor, line.meta.cursor);
        if (original.endsWith(line.meta.linebreak)) {
          original = original.slice(0, original.length - line.meta.linebreak.length);
        }
        originals.push({
          num: idx,
          line: original,
        });

        cursor = line.meta.cursor;
        idx++;
      },
      complete(_r, a) {
        a.destroy();
        resolve(debugInfo);
      },
    };

    Papa.parse(stream, parseConfig);
  });
}

入力値はString型のCSVデータです

PapaParseのドキュメントにはResultのmetaにcursorについての記載はありませんが、実際はどの位置までparseされたかの情報が取得できます。 今回はそれを使用して、元のStringから該当範囲を抽出する形で要件を実現しています。

このcursor情報を行ごとに取得するためにStreamで処理しており、そういう意味で少し特殊な使い方かもしれません。

今回は最後までparseしていますが、必要な行までparseしたら中断することも可能で、そういった選択肢があるのもStreamの特徴です。

まとめ

本番環境では処理の偏りが大きくメトリクスなどでの変化を示すことは残念ながら難しいですが、確実な改善が見られました。

今回の例だと、コード量でいえば導入前より増えてしまっていますし、Streamを使わない書き方が理解しやすいというのは否定できません。 ただし、それは既存のコードに対して後から部分的に導入せざるを得なかったという要因もあります。

例えば、入出力をStreamとして処理を有意な単位で分割し、それをpipelineで繋げることで、本来の使い方ができます。それによりストリーム処理のメリットをフルに享受できるのに加え、上記のデメリットも改善されると思います。

多くの場合、Stream処理を使わなくても実装できてしまうため、特に初期実装時においては意識をしないとStream処理を使うことは少ないかもしれません。一方で、いざパフォーマンスの課題が出た後に導入しようとすると手間も多くなることも実感としてあるので、できるだけ早い段階で選択肢に入れることをお勧めします。

何かの参考になれば幸いです。

We’re hiring!

DIGGLEではともにプロダクトを開発・運用をしてくれるエンジニアを大募集中です。 少しでも興味があれば、ぜひ下記採用サイトからエントリーください。
カジュアル面談も実施しているので、気軽なお気持ちでご応募いただければと思います!

herp.careers