こんにちは、プラットフォーム技術部第1グループの河野です。 クラウドサービスを使用しているとメモリやストレージを節約したくなりますよね? 今回はそんな節約に役立つNode.jsのStream APIについてです。Java等にもStreamはありますが、全くの別物です。AWSのSDKと組み合わせて、S3からダウンロード、圧縮してアップロードするサンプルと共に紹介していきます。
S3についての説明はここでは省略します。こちらのブログの「S3とは」のセクションに簡単な説明があります。 developers.play.jp
Node.jsのStream APIとは
通常のファイル等をダウンロードして読み込む処理では、データを一括で読み込んでメモリに載せることになると思います。 しかし、Stream APIを使用すると、データをチャンク*1に分けて読み込み、チャンク毎に圧縮やアップロード等の処理をすることができます。チャンクごとに処理することで、メモリの消費量を節約しつつ数GB以上のファイルを処理することができるようになります。使用するケースによっては、ストレージに書き出す必要が無くなることもあります。
AWS SDKとStream APIを使用したサンプルコード
サンプルコードと共に使い方を見ていきます。
概要とサンプルコード
Node.jsのバージョンは18.17.1、AWS SDKはV3を使用しています。 S3からStreamでファイルを取得、圧縮してアップロードするサンプルを書いてみます。 AWS SDKはStream APIに対応していて、S3からGetObjectで取得したレスポンスのBodyはStreamになっています。*2 このサンプルではメモリの消費量が抑えられ、実行環境に一時的にファイルを出力することもありません。
const { S3Client, GetObjectCommand } = require("@aws-sdk/client-s3"); const { Upload } = require("@aws-sdk/lib-storage"); // アクセスキーなど秘密の情報 const { accessKey, secretKey, bucket, region } = require("./config"); const zlib = require("zlib"); const test = async () => { const client = new S3Client({ credentials: { accessKeyId: accessKey, secretAccessKey: secretKey, }, region, }); const command = new GetObjectCommand({ Bucket: bucket, Key: "upload_test.txt", }); const response = await client.send(command); const stream = response.Body; const zipStream = zlib.createGzip(); stream.on("data", (chunk) => { console.log("chunkを処理"); }); stream.on("error", (err) => { console.log(err); console.log("getの際にStreamでエラー"); zipStream.destroy(err); }); stream.on("end", () => { console.log("全部のchunkの転送終わった"); }); zipStream.on("data", (chunk) => { console.log("chunkを処理"); }); zipStream.on("error", (err) => { console.log(err); console.log("zip時にエラー"); stream.destroy(err); }); zipStream.on("end", () => { console.log("zip完了"); }); // 圧縮用のStreamに渡す stream.pipe(zipStream); const upload = new Upload({ client: client, params: { Bucket: bucket, Key: "upload_test.gz", Body: zipStream, }, // 大きいファイル想定する場合はpartSizeを大きくする // partSize: 1024 * 1024 * 60, }); console.log("upload宣言後"); await upload.done(); console.log("done"); }; test();
処理の流れと挙動の説明
サンプルコードには下記の処理がありますが、Streamの処理のイメージとしては、これらの処理をデータが一括で通過していくのではなくチャンク単位で通過しているイメージです。チャンク単位で圧縮されているというのがイメージ付きづらいですが。。
- S3からデータの取得
- 取得したデータをgzipで圧縮する
- S3へデータをアップロードする
どのような順番で処理されるのか、 ある程度容量があるファイルで実際にデバッガーで止めて確認してみると、以下のようになっていました。
upload.done()
でawaitを開始stream.on("data")
のイベントが1〜2回発火zipStream.on("data")
のイベントが 2.で発火した分だけ発火。(この1〜2件のチャンクだけ圧縮されてます)- 2〜3.を繰り返す
stream
で全てのチャンクが処理されて、stream.on("end")
が発火zipStream
で全てのチャンクが処理されて、zipStream.on("end")
が発火- アップロード完了
次にサンプルコードの中で登場するStream APIの機能の一部について解説します。
pipeについて
pipeメソッドは、Streamを書きこみ用や圧縮用等の別のStreamへ関連付けてデータを渡してくれます。pipeについての注意点は前半でエラーすると後続へは進まないため、pipeで使用するStreamには全てエラーイベントを用意しておくべきということです。 また、エラーした時は繋げている他のStreamを閉じる必要があります。 WEBアプリケーション等起動し続けているプログラムでStreamを使用する場合メモリリークになる可能性もありますので注意が必要です。 Node.js公式に下記のような記載があります。
One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.
Streamで使用できるイベント
Streamを処理している際に.on()
を使用してイベントを取得することができます。主に使うのはこの辺りでしょうか。
イベント名 | 説明 |
---|---|
data | 分割したチャンクを処理するごとに発火 |
error | Streamでエラーが発生した時に発火 |
end | 全部のチャンクを処理し終わった時に発火 |
stream.on("data", (chunk) => { // チャンクのバイト情報 console.log(chunk); console.log("chunkを処理"); }); stream.on("error", (err) => { // エラーの詳細を出力 console.log(err); console.log("Streamでエラー"); // エラー処理用の関数やコールバックするなど }); stream.on("end", () => { console.log("全部のchunkの転送終わった"); });
他にもいくつかありますので、Node.jsのドキュメントのEvent:となっている箇所を参照いただければと思います。nodejs.org
エラー処理を簡潔にできるpipeline関数
Node.js 15以降はpipeline関数が追加されて、エラーハンドリングをまとめて行うことができます。エラーした時に他のStreamを閉じる必要もなくなります。 上記のサンプルコードをpipelineで書き換えると下記のようになります。
const { pipeline } = require("stream/promises"); // ~略 try { await pipeline(stream, zipStream, upload.done()) } catch (error) { console.log("pipelineでerror"); console.log(error); } console.log("done");
複数ファイルの圧縮
複数ファイルのStreamを1ファイルに圧縮をしたい場合、ライブラリを使用した方が書きやすそうだったので、簡単に紹介します。 archiverというライブラリです。 github.com
圧縮したいStreamをappendしていきfinalizeすると圧縮できます。
const archiver = require("archiver"); const fs = require("fs"); const archive = archiver("zip", { zlib: { level: 9 } }); const writeStream = fs.createWriteStream("./test.zip"); const keyList = ["./hoge.txt", "./fuga.txt"] for (const key of keyList) { const readStream = fs.createReadStream(key); archive.append(readStream, { name: key }); } archive.pipe(writeStream); archive.finalize();
最後に
ここで紹介できた内容はStream周りの機能の一部です。調べていくと色々な書き方が他にもあります。またNode.jsのドキュメントにExperimentalでいくつか機能が追加されていましたので、今後も動向のキャッチアップが必要になるかもしれませんね。 最後まで読んでいただきありがとうございます。
参考
Node.js公式
https://nodejs.org/api/stream.html
StreamでCSVファイルをS3からS3にコピーする
https://dev.classmethod.jp/articles/nodejs-aws-sdk-v3-csv-streaming-in-s3/
Node.jsデザインパターン
https://www.oreilly.co.jp/books/9784873118734/