こんにちは、LOGICAプロダクトグループの篠原です。
先月末に約10年振りにARMORED COREの新作が出ました。 以前からAC乗りはやっていたので久しぶりにコントローラーを持ち3周目まで完了した感想ですがやはりハイスピードメカバトルはよい物です。
閑話休題、私の普段の業務の中で、動画のメタ情報の取り込みやユーザ、決済などの各種データ状態変更などをバックエンド側から取り扱うことが多く、運用状況を見ながら改善に取り組んでいます。
そこで今回は非同期処理としてsidekiqを使っているので、運用するまでにつまづいたポイントをまとめてみようかと思いました。
sidekiqとは
sidekiqとは主にRailsアプリで非同期処理を行うためのgemです。
複数のジョブを同時に実行することができ、Redisにタスクをキューイングすることによって順次タスクを消化することができます。 詳しい説明については公式のドキュメントを参照ください。
バージョンは7.0.3を利用する前提で記述しています。
設定項目について
設定項目はシンプルでステージング用、本番用と環境毎に設定することも可能です。
config/sidekiq.yml
:concurrency: 10 # 1つのsidekiqプロセスで使用するスレッド数、デフォルトは5 staging: :concurrency: 35 # ステージング用 production: :concurrency: 45 # 本番用 :queues: # 実行するworker名を記載 - data_import_worker - data_update_worker - status_update_worker
起動の仕方
以下のコマンドで実行可能です。
bundle exec sidekiq
運用を想定した上で検証するために用意したこと
200箇所ほどの拠点からデータ更新のために、S3に対して更新用ファイルを随時配置してもらい定期的にチェックして更新するという要件がありました。
5分毎に取り込み処理を起動して、S3に配置された各ファイル内にある更新日が更新されていたら更新項目を読み込んだ上で、対象データを書き換えるという処理を検証用として作りました。
実際の検証
つまづいたポイントその1
ステージング環境でsidekiqを起動して、いざ5分毎の設定で当該バッチを動かしてみたところ以下のエラーが発生
ActiveRecord::ConnectionTimeoutError: could not obtain a database connection within 5.000 seconds (waited 5.019 seconds)
DBへのConnectionTimeoutErrorとのことなので調べてみたところ、コネクションプールの数を確保する必要があるとのこと。 デフォルトの5になっていたので、ステージング用のconcurrencyに設定した値にはsidekiqプロセス用に+1した値を確保しました。
database.yml
staging: <<: *default reconnect: true pool: 36
その後、動かしてみるとConnectionTimeoutErrorが発生しないことを確認できました。
つまづいたポイントその2
ConnectionTimeoutErrorが収まったので再度キューの状況を確認するとdata_import_workerは順次処理されるが、data_update_workerの方はキューが処理されず一定時間経過後に処理が行われる事象が発生したので内容を確認しました。
どうやらdata_import_workerでconcurrencyに設定した実行枠を全て消費してしまい、他のworker処理がキューイングされるが実行枠に空きができるまで待っているのでキューが消費できずスタックしてしまっている模様です。
古いバージョンではsidekiq.yml内でlimitsオプションで区切ることで対応していたのですが 現行バージョンでは、カプセル化でどのworkerにいくつconcurrencyを割り当てるか制御できるようです。 そこで下記のように割り当てる内容を記述しました。
initializers/sidekiq.rb
Sidekiq.configure_server do |config| config.capsule("data_import_worker") do |cap| cap.concurrency = 30 cap.queues = %w[data_import_worker] end config.capsule("data_update_worker") do |cap| cap.concurrency = 5 cap.queues = %w[data_update_worker] end config.capsule("status_update_worker") do |cap| cap.concurrency = 1 cap.queues = %w[status_update_worker] end end
上記設定後、sidekiqを再起動して検証してログを確認してみると それぞれdata_import_workerで30ずつ、data_update_workerで5ずつキューが消費されることを確認しました。
まとめ
今回は主に最初につまづきそうな点をまとめてみました。 sidekiq自体は実行するタスクを指定するだけなのですが、それを用いるための実際の処理はちゃんとキューの消化に見合うのかサーバのスペックで処理仕切れるのかと設計・実装の観点からチェックすることも重要です。
この後、実施に運用してみたのですがworker本体の処理が重いため5分以内に処理が捌けずキューがスタックしまくって後に発行したキューが遅延したり、メモリを消費し続けてキャパシティアラートが延々と出たりと色々対応したこともあるのですがまた機会があればお話しできればと思います。