1. はじめに
こんにちは、スタディサプリで学校向けシステムを開発している @orcar です。
本記事では、学校からお預かりした成績 CSV ファイルを取り込むバッチ処理の並列実行方式を、Resque worker + HPA から Argo Workflows へ移行した話を書きます。
この記事で扱うこと
- HPA を使ったバッチワーカーのスケールで起こった、scale-in ができないことによるコスト過大の問題
- Argo Workflows の
withSequence+parallelismを使った置き換えの設計 - 実装と運用上の工夫
この記事で扱わないこと
- Argo Workflows そのもののセットアップ方法
- Resque の基本的な使い方
目次
- 1. はじめに
- 目次
- 2. 背景:何をするジョブなのか
- 3. 旧構成:Resque worker を HPA で並列処理
- 4. 新しい設計の採用理由
- 5. 新構成:Argo Workflows での実装
- 6. まとめ
2. 背景:何をするジョブなのか
私たちは、到達度テストというテストを提供しています。 学校で受検してもらった到達度テストの結果はCSVファイルとして別システムから連携され、連携されてきたCSVファイルをデータベースに取り込む処理を日次で定期実行しています。
- データソース: 別システムから S3 に配置される CSV ファイル
- 取り込みの単位: organization (学校法人 / 学校) 単位
- 実行頻度: 日次
organization の数だけ取り込み処理があり、それぞれが独立して実行可能なため、並列化が効きやすいワークロードです。
3. 旧構成:Resque worker を HPA で並列処理
旧構成が実装された当時は、Argo Workflows の社内運用基盤はまだ整備されていませんでした。そのため、当時の現実的な選択肢として Resque worker を HPA で並列処理する 構成を採用していました。
3.1 アーキテクチャ
[Scheduler] --enqueue--> [Resque queue (Redis)]
|
v
[Resque worker Pod] x N ← HPA でスケール
ジョブを enqueue する側 (Scheduler) と、それを処理する側 (Resque worker Pod) が分かれていて、worker 数を HPA で動的にスケールすることで並列度を上げていました。
HPA のスケール指標としては 「キュー内で最も古いジョブが待機している時間」を表すカスタムメトリクス を使っていました。CPU 使用率のような汎用メトリクスではなく、「処理しきれていないジョブが滞留していないか」を直接見るためです。
- キューにジョブが積まれて時間が経つほど指標値が大きくなる → scale-out
- 指標値が 0 にならない限り scale-in は発動せず、Pod 数は scale-out された水準のまま維持される(なぜ実態として指標値が 0 にならないのかは 3.2 で説明します)
3.2 旧構成の問題点
旧構成の問題点は、scale-in ができないことによるコストの増加 でした。端的に言えば、一度スケールアウトしたら、すべての処理が完了するまでスケールインしない(できない) という状況です。
3.1 で見たとおり、HPA は指標値が 0 にならない限り scale-in を発動しません。そして実態として、この指標は処理がすべて完了するまで 0 にならない ように意図的に選ばれていました。そのような選択がされたのには、HPA の構造的な制約による背景があります。
制約 1: scale-in 時に止める Pod はランダムに選ばれる
Kubernetes の HPA で scale-in が発動するとき、どの Pod を停止対象とするかは基本的にランダムに(あるいは Pod 起動順などの汎用ルールで)決まります。「いまこの Pod は団体 X の取り込みを処理中だから残す」といった処理状況は考慮されません。
制約 2: graceful shutdown も現実的でない
「terminationGracePeriodSeconds を長く取って、処理が終わってから止まればよい」というアイデアもありますが、取り込み処理の所要時間は団体ごとに大きくばらつき、短いものは 30 分程度、長いものは 8 時間に及ぶケース もありました。
長時間に及ぶケースに合わせて terminationGracePeriodSeconds を 8 時間オーダーで設定するのは現実的ではなく、それより短い値にすれば結局は強制 kill で同じ問題が起きます。「ジョブが終わるまで待つ」アプローチは、ワークロードの幅が広すぎて成立しませんでした。
その帰結:Pod の張り付きとコストの増加
この 2 つの制約のため、運用上は 「処理がすべて完了するまで scale-in を発動させない」 という方針を取らざるを得ませんでした。それを実現する手段として選ばれていたのが、3.1 で触れた「キュー内最古ジョブの待機時間」をスケール指標にする構成です。処理対象が残っている間は指標値が下がらないため、HPA は scale-in を発動しません。
結果として、
- 処理がすべて完了するまで、Pod 数は scale-out された水準に 張り付いたまま 維持される
- ほとんどの worker が idle 状態でも Pod は減らない
- スループットに寄与しない Pod 群が、コストだけ発生し続ける
という状況が生まれます。
つまり、根本的な問題は HPA そのものではなく、「処理中の Pod を任意のタイミングで安全に止められない」というバッチワークロード側の性質と、「Pod を任意のタイミングで止めうる」という HPA の性質のミスマッチ でした。HPA は 「Pod 数 ≒ 仕事の量」が成り立つロングランニングサービスには向いていますが、「途中で止められない処理を抱えたバッチ」とは相性が悪いのです。
4. 新しい設計の採用理由
3.2 で見た「scale-in ができない」問題に対して、最初は HPA を維持したまま改善できないか を検討しました。しかし最終的に、HPA を使い続けること自体に構造的な限界があると判断し、Spot インスタンス上で並列処理する方針 に切り替えています。本章では、その経緯と理由を説明します。
まずは HPA のチューニングを検討した(没案)
最初に検討したのは、HPA を残したまま scale-in を安全に発動させる工夫です:
- スケール指標を「キュー長」など完了時にゼロになるものに変えて scale-in を発火させる
terminationGracePeriodSecondsを長く取り、worker が処理を終えてから停止できるようにする
しかし、いずれも根本問題を解けないとわかりました:
- メトリクスを切り替えて scale-in を発火させても、Pod がランダムに選ばれて kill される根本問題は変わらない
- graceful shutdown は団体ごとの処理時間が 30 分〜8 時間と幅広く、
terminationGracePeriodSecondsを実用的な範囲に収められない
HPA をチューニングしても 「実行中のジョブの完了を待って終了する」保証は得られない のです。
そもそも HPA とこのワークロードは相性が悪い
団体ごとに処理時間が 30 分〜8 時間と大きく異なるワークロードを、HPA で扱うこと自体が構造的に相性が悪い のです。HPA は「Pod 数 ≒ 仕事の量」が成り立つロングランニングサービスの負荷追従には向いていますが、ワークロードのバラつきが大きいバッチ処理では:
- 最遅ジョブを抱えた Pod が全体を引きずって scale-in を阻害する
- かといって途中で Pod を kill すると in-flight ジョブが壊れる
- 結果として「処理完了まで Pod が張り付き、コストだけ膨らむ」現象が避けられない
つまり、メトリクス設計や閾値調整といったパラメータレベルのチューニングでは、本質的な改善は得られないという結論に至り、HPA 以外の採用も含めて再検討する ことにしました。
Spot インスタンス上でジョブごとに並列処理する方針へ(採用案)
そこで方針を切り替え、Spot インスタンス上でジョブ単位に Pod を起動し、処理が終われば自然に破棄する 並列処理に移行することにしました。実装手段としては、社内に既に運用基盤がある Argo Workflows の withSequence + parallelism を採用しています。
この方針には、消極的な理由(問題回避)と積極的な理由(新しい能力獲得)の両方があります。
消極的な理由(問題回避)
- ジョブが終わったら Pod が 自然に消える:処理中の Pod を外から kill する必要がないので、scale-in の安全性問題そのものが発生しない
- HPA のチューニングや graceful shutdown の延命策に頼らずに済む
積極的な理由(新しい能力獲得)
- 団体ごとのワークロード差(30 分〜8 時間)を、Spot インスタンス上でジョブごとに起動して並列処理する 構成が自然に組める。Argo Workflows の Pod は単発で起動して処理が終わったら消えるため、Spot ノードの「いつ中断されるかわからない」性質を許容しやすく、長時間ロングランニングする HPA + Resque 構成より Spot との相性がよい
- 並列度を
parallelism: Nで 宣言的に書ける(HPA のmaxReplicas相当の制御) - 既に社内に Argo Workflows の運用基盤があるため、Slack 通知・失敗時のアラート・UI による進捗確認などの共通機能にそのまま乗れる
💡 メンタルモデル
Argo Workflows は Push 型ジョブ:ワークフローが「この処理を index 0 から N-1 まで並列で実行する」と push し、各 index に対応する Pod を起動する。Pod は担当した処理が終わったら自然に終了する。処理中の Pod を外から kill する仕組み(HPA の scale-in)が登場しないため、「実行中ジョブを誤って kill するリスク」自体が構造的に存在しない。
旧構成は「scale-in したいができない」というジレンマを抱えていましたが、Argo Workflows では そもそも scale-in という操作を必要としません。さらに、Pod の単発起動は Spot インスタンス活用とも噛み合い、ワークロードの幅が大きいバッチを Pod 単位で必要なときだけ起動する という運用に切り替えられます。
5. 新構成:Argo Workflows での実装
5.1 全体像
[CronWorkflow]
|
v
[get-organization-ids] ← 対象 organization の一覧を出力
|
v
[import-files] x N (parallelism: 20) ← 並列に取り込み
シンプルな 2 ステップ構成です。
5.2 manifest 全体
社内固有の設定値は省略していますが、全体像は以下です。main テンプレートが全体フロー、get-organization-ids がステップ 1、import-files がステップ 2 にあたります。
# ※ 本記事用に作成した manifest であり、workflow 名・schedule・Runner クラス名・image などは # 実際のものとは異なるダミー値です。並列処理の構造を理解するための参考として読んでください。 apiVersion: argoproj.io/v1alpha1 kind: CronWorkflow metadata: name: import-files-by-organization-id spec: schedules: - "0 0 * * *" # 日次 0:00 (JST) に実行 timezone: Asia/Tokyo concurrencyPolicy: Allow workflowSpec: entrypoint: main activeDeadlineSeconds: 28800 # 8 時間で workflow を打ち切る (ランナウェイ防止) templates: # ───── 全体フロー ───────────────────────────── - name: main steps: # ステップ 1: 取り込み対象の organization 一覧を列挙 - - name: get-organization-ids template: get-organization-ids # ステップ 2: ステップ 1 の出力を index 単位に展開して並列取り込み - - name: import-files template: import-files arguments: parameters: # withSequence が展開する 0..count-1 の整数値をそのまま渡す - name: organization-ids-index value: "{{item}}" artifacts: # ステップ 1 が出力した organization 一覧を artifact として渡す - name: organization-ids from: "{{steps.get-organization-ids.outputs.artifacts.organization-ids}}" withSequence: # 0..count-1 の整数を {{item}} に展開する count: "{{steps.get-organization-ids.outputs.parameters.count}}" # 同時に走る import-files Pod の上限。HPA の maxReplicas に相当する概念 parallelism: 20 # ───── ステップ 1: 対象 organization の列挙 ─── - name: get-organization-ids container: image: <image> command: - bin/rails - runner - ListTargetOrganizationIdsRunner.new.run # organization の一覧を /tmp に書き出す処理 outputs: artifacts: # 対象 organization の ID 一覧 (JSON 配列) - name: organization-ids path: /tmp/organization_ids.json parameters: # 一覧の件数。後段の withSequence.count から参照される - name: count valueFrom: path: /tmp/organization_ids_count # ───── ステップ 2: 並列取り込み (Pod ごとに 1 organization を処理) ── - name: import-files inputs: parameters: # withSequence が展開した整数 index - name: organization-ids-index artifacts: # ステップ 1 が出力した organization 一覧をこの Pod のローカルに展開 - name: organization-ids path: /tmp/organization_ids.json container: image: <image> command: - bin/rails - runner - ImportFilesRunner.new.run # 自分の担当 index の organization を取り出して取り込む処理 env: # 担当する index を環境変数として渡す - name: ORGANIZATION_IDS_INDEX value: "{{inputs.parameters.organization-ids-index}}"
ポイントは、ステップ 1 が出力した artifact (/tmp/organization_ids.json) と件数 (count) を、ステップ 2 が withSequence + {{item}} で受け取るところです。withSequence が 0 から count - 1 までの整数を {{item}} に展開し、その数だけ Pod が並列に起動します。各 Pod は受け取った index を使って artifact から該当の organization を取り出し、取り込み処理を実行します。
💡 補足
Argo Workflows には、リストをそのまま展開する
withItems/withParamというより簡潔な書き方もあります。ただしwithParamで渡せるパラメータには上限があり、今回のユースケースでは organization 数が増えたときに上限を超える可能性があったため、index をwithSequenceで展開し、実データは artifact 経由で共有する方式を採用しました。
5.3 設計判断: parallelism: 20 の意味
parallelism: 20 は、この steps の中で 同時に走る Pod の数の上限を表します。HPA の maxReplicas に相当する概念ですが、決定的な違いがあります:
- HPA: 「現在の Pod 数」をメトリクスで増減させる連続的な値。scale-in 時にどの Pod を kill するかは処理状況を考慮しない
- Argo: 「同時に並列で起動できる Pod 数」の上限。各 Pod は自分の担当が終わったら自然終了し、空いた枠で次の index が起動する
値の決め方は 旧構成 (Resque worker + HPA) の maxReplicas に揃える という方針を取りました。
- 移行時に並列度を変えてしまうと、性能特性や下流(DB / 外部リソース)への負荷も同時に変わってしまい、移行の効果を切り分けにくくなる
- 過去にチューニング済みの値をそのまま踏襲できる(DB 接続数や上流負荷を踏まえて Resque 時代に決められた値)
- 「まずは同じ並列度で動くこと」を確認してから、必要に応じて調整する方針
5.4 運用面の工夫
実装そのもの以外で、運用上効いた工夫を挙げます。
失敗時の Slack 通知
社内の Argo 基盤には、Workflow に専用の annotation を付与するだけで失敗時に Slack に通知が飛ぶ仕組みが整備されており、これに乗ることができました。既存の仕組みに乗れることの嬉しさの一例です。
ランナウェイ防止
日次で取り込みジョブが実行されているため、翌日のジョブ起動時刻まで動くと困ります。そのため 5.2 の manifest で示した通り activeDeadlineSeconds に 28800(8 時間)を設定し、最悪のケースでも翌日の起動までに workflow が打ち切られるようにしています。
6. まとめ
- HPA は 長く動き続けるワーカー のスケールには向いているが、途中で安全に止められない処理を Pod に抱えたバッチには、Pod が自然終了するジョブ志向ツール(Argo Workflows 等)が合う
withSequence+parallelism+ artifact による index 配布は、並列バッチの定番パターンとして再利用しやすい- 既存の Argo 基盤(Slack 通知 / UI / 監視)に乗ることで、移行コストを大きく下げられた