スタディサプリ Product Team Blog

株式会社リクルートが開発するスタディサプリのプロダクトチームのブログです

Resque + HPA から Argo Workflows へ — 並列処理を作り直した話

1. はじめに

こんにちは、スタディサプリで学校向けシステムを開発している @orcar です。

本記事では、学校からお預かりした成績 CSV ファイルを取り込むバッチ処理の並列実行方式を、Resque worker + HPA から Argo Workflows へ移行した話を書きます。

この記事で扱うこと

  • HPA を使ったバッチワーカーのスケールで起こった、scale-in ができないことによるコスト過大の問題
  • Argo Workflows の withSequence + parallelism を使った置き換えの設計
  • 実装と運用上の工夫

この記事で扱わないこと

  • Argo Workflows そのもののセットアップ方法
  • Resque の基本的な使い方

目次


2. 背景:何をするジョブなのか

私たちは、到達度テストというテストを提供しています。 学校で受検してもらった到達度テストの結果はCSVファイルとして別システムから連携され、連携されてきたCSVファイルをデータベースに取り込む処理を日次で定期実行しています。

  • データソース: 別システムから S3 に配置される CSV ファイル
  • 取り込みの単位: organization (学校法人 / 学校) 単位
  • 実行頻度: 日次

organization の数だけ取り込み処理があり、それぞれが独立して実行可能なため、並列化が効きやすいワークロードです。


3. 旧構成:Resque worker を HPA で並列処理

旧構成が実装された当時は、Argo Workflows の社内運用基盤はまだ整備されていませんでした。そのため、当時の現実的な選択肢として Resque worker を HPA で並列処理する 構成を採用していました。

3.1 アーキテクチャ

[Scheduler] --enqueue--> [Resque queue (Redis)]
                                  |
                                  v
                         [Resque worker Pod] x N  ← HPA でスケール

ジョブを enqueue する側 (Scheduler) と、それを処理する側 (Resque worker Pod) が分かれていて、worker 数を HPA で動的にスケールすることで並列度を上げていました。

HPA のスケール指標としては 「キュー内で最も古いジョブが待機している時間」を表すカスタムメトリクス を使っていました。CPU 使用率のような汎用メトリクスではなく、「処理しきれていないジョブが滞留していないか」を直接見るためです。

  • キューにジョブが積まれて時間が経つほど指標値が大きくなる → scale-out
  • 指標値が 0 にならない限り scale-in は発動せず、Pod 数は scale-out された水準のまま維持される(なぜ実態として指標値が 0 にならないのかは 3.2 で説明します)

3.2 旧構成の問題点

旧構成の問題点は、scale-in ができないことによるコストの増加 でした。端的に言えば、一度スケールアウトしたら、すべての処理が完了するまでスケールインしない(できない) という状況です。

3.1 で見たとおり、HPA は指標値が 0 にならない限り scale-in を発動しません。そして実態として、この指標は処理がすべて完了するまで 0 にならない ように意図的に選ばれていました。そのような選択がされたのには、HPA の構造的な制約による背景があります。

制約 1: scale-in 時に止める Pod はランダムに選ばれる

Kubernetes の HPA で scale-in が発動するとき、どの Pod を停止対象とするかは基本的にランダムに(あるいは Pod 起動順などの汎用ルールで)決まります。「いまこの Pod は団体 X の取り込みを処理中だから残す」といった処理状況は考慮されません。

制約 2: graceful shutdown も現実的でない

terminationGracePeriodSeconds を長く取って、処理が終わってから止まればよい」というアイデアもありますが、取り込み処理の所要時間は団体ごとに大きくばらつき、短いものは 30 分程度、長いものは 8 時間に及ぶケース もありました。

長時間に及ぶケースに合わせて terminationGracePeriodSeconds を 8 時間オーダーで設定するのは現実的ではなく、それより短い値にすれば結局は強制 kill で同じ問題が起きます。「ジョブが終わるまで待つ」アプローチは、ワークロードの幅が広すぎて成立しませんでした。

その帰結:Pod の張り付きとコストの増加

この 2 つの制約のため、運用上は 「処理がすべて完了するまで scale-in を発動させない」 という方針を取らざるを得ませんでした。それを実現する手段として選ばれていたのが、3.1 で触れた「キュー内最古ジョブの待機時間」をスケール指標にする構成です。処理対象が残っている間は指標値が下がらないため、HPA は scale-in を発動しません。

結果として、

  • 処理がすべて完了するまで、Pod 数は scale-out された水準に 張り付いたまま 維持される
  • ほとんどの worker が idle 状態でも Pod は減らない
  • スループットに寄与しない Pod 群が、コストだけ発生し続ける

という状況が生まれます。

つまり、根本的な問題は HPA そのものではなく、「処理中の Pod を任意のタイミングで安全に止められない」というバッチワークロード側の性質と、「Pod を任意のタイミングで止めうる」という HPA の性質のミスマッチ でした。HPA は 「Pod 数 ≒ 仕事の量」が成り立つロングランニングサービスには向いていますが、「途中で止められない処理を抱えたバッチ」とは相性が悪いのです。


4. 新しい設計の採用理由

3.2 で見た「scale-in ができない」問題に対して、最初は HPA を維持したまま改善できないか を検討しました。しかし最終的に、HPA を使い続けること自体に構造的な限界があると判断し、Spot インスタンス上で並列処理する方針 に切り替えています。本章では、その経緯と理由を説明します。

まずは HPA のチューニングを検討した(没案)

最初に検討したのは、HPA を残したまま scale-in を安全に発動させる工夫です:

  • スケール指標を「キュー長」など完了時にゼロになるものに変えて scale-in を発火させる
  • terminationGracePeriodSeconds を長く取り、worker が処理を終えてから停止できるようにする

しかし、いずれも根本問題を解けないとわかりました:

  • メトリクスを切り替えて scale-in を発火させても、Pod がランダムに選ばれて kill される根本問題は変わらない
  • graceful shutdown は団体ごとの処理時間が 30 分〜8 時間と幅広く、terminationGracePeriodSeconds を実用的な範囲に収められない

HPA をチューニングしても 「実行中のジョブの完了を待って終了する」保証は得られない のです。

そもそも HPA とこのワークロードは相性が悪い

団体ごとに処理時間が 30 分〜8 時間と大きく異なるワークロードを、HPA で扱うこと自体が構造的に相性が悪い のです。HPA は「Pod 数 ≒ 仕事の量」が成り立つロングランニングサービスの負荷追従には向いていますが、ワークロードのバラつきが大きいバッチ処理では:

  • 最遅ジョブを抱えた Pod が全体を引きずって scale-in を阻害する
  • かといって途中で Pod を kill すると in-flight ジョブが壊れる
  • 結果として「処理完了まで Pod が張り付き、コストだけ膨らむ」現象が避けられない

つまり、メトリクス設計や閾値調整といったパラメータレベルのチューニングでは、本質的な改善は得られないという結論に至り、HPA 以外の採用も含めて再検討する ことにしました。

Spot インスタンス上でジョブごとに並列処理する方針へ(採用案)

そこで方針を切り替え、Spot インスタンス上でジョブ単位に Pod を起動し、処理が終われば自然に破棄する 並列処理に移行することにしました。実装手段としては、社内に既に運用基盤がある Argo Workflows の withSequence + parallelism を採用しています。

この方針には、消極的な理由(問題回避)と積極的な理由(新しい能力獲得)の両方があります。

消極的な理由(問題回避)

  • ジョブが終わったら Pod が 自然に消える:処理中の Pod を外から kill する必要がないので、scale-in の安全性問題そのものが発生しない
  • HPA のチューニングや graceful shutdown の延命策に頼らずに済む

積極的な理由(新しい能力獲得)

  • 団体ごとのワークロード差(30 分〜8 時間)を、Spot インスタンス上でジョブごとに起動して並列処理する 構成が自然に組める。Argo Workflows の Pod は単発で起動して処理が終わったら消えるため、Spot ノードの「いつ中断されるかわからない」性質を許容しやすく、長時間ロングランニングする HPA + Resque 構成より Spot との相性がよい
  • 並列度を parallelism: N宣言的に書ける(HPA の maxReplicas 相当の制御)
  • 既に社内に Argo Workflows の運用基盤があるため、Slack 通知・失敗時のアラート・UI による進捗確認などの共通機能にそのまま乗れる

💡 メンタルモデル

Argo Workflows は Push 型ジョブ:ワークフローが「この処理を index 0 から N-1 まで並列で実行する」と push し、各 index に対応する Pod を起動する。Pod は担当した処理が終わったら自然に終了する。処理中の Pod を外から kill する仕組み(HPA の scale-in)が登場しないため、「実行中ジョブを誤って kill するリスク」自体が構造的に存在しない

旧構成は「scale-in したいができない」というジレンマを抱えていましたが、Argo Workflows では そもそも scale-in という操作を必要としません。さらに、Pod の単発起動は Spot インスタンス活用とも噛み合い、ワークロードの幅が大きいバッチを Pod 単位で必要なときだけ起動する という運用に切り替えられます。


5. 新構成:Argo Workflows での実装

5.1 全体像

[CronWorkflow]
        |
        v
[get-organization-ids]   ← 対象 organization の一覧を出力
        |
        v
[import-files] x N  (parallelism: 20)   ← 並列に取り込み

シンプルな 2 ステップ構成です。

5.2 manifest 全体

社内固有の設定値は省略していますが、全体像は以下です。main テンプレートが全体フロー、get-organization-ids がステップ 1、import-files がステップ 2 にあたります。

# ※ 本記事用に作成した manifest であり、workflow 名・schedule・Runner クラス名・image などは
#   実際のものとは異なるダミー値です。並列処理の構造を理解するための参考として読んでください。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: import-files-by-organization-id
spec:
  schedules:
    - "0 0 * * *"  # 日次 0:00 (JST) に実行
  timezone: Asia/Tokyo
  concurrencyPolicy: Allow
  workflowSpec:
    entrypoint: main
    activeDeadlineSeconds: 28800  # 8 時間で workflow を打ち切る (ランナウェイ防止)
    templates:
      # ───── 全体フロー ─────────────────────────────
      - name: main
        steps:
          # ステップ 1: 取り込み対象の organization 一覧を列挙
          - - name: get-organization-ids
              template: get-organization-ids
          # ステップ 2: ステップ 1 の出力を index 単位に展開して並列取り込み
          - - name: import-files
              template: import-files
              arguments:
                parameters:
                  # withSequence が展開する 0..count-1 の整数値をそのまま渡す
                  - name: organization-ids-index
                    value: "{{item}}"
                artifacts:
                  # ステップ 1 が出力した organization 一覧を artifact として渡す
                  - name: organization-ids
                    from: "{{steps.get-organization-ids.outputs.artifacts.organization-ids}}"
              withSequence:
                # 0..count-1 の整数を {{item}} に展開する
                count: "{{steps.get-organization-ids.outputs.parameters.count}}"
        # 同時に走る import-files Pod の上限。HPA の maxReplicas に相当する概念
        parallelism: 20

      # ───── ステップ 1: 対象 organization の列挙 ───
      - name: get-organization-ids
        container:
          image: <image>
          command:
            - bin/rails
            - runner
            - ListTargetOrganizationIdsRunner.new.run  # organization の一覧を /tmp に書き出す処理
        outputs:
          artifacts:
            # 対象 organization の ID 一覧 (JSON 配列)
            - name: organization-ids
              path: /tmp/organization_ids.json
          parameters:
            # 一覧の件数。後段の withSequence.count から参照される
            - name: count
              valueFrom:
                path: /tmp/organization_ids_count

      # ───── ステップ 2: 並列取り込み (Pod ごとに 1 organization を処理) ──
      - name: import-files
        inputs:
          parameters:
            # withSequence が展開した整数 index
            - name: organization-ids-index
          artifacts:
            # ステップ 1 が出力した organization 一覧をこの Pod のローカルに展開
            - name: organization-ids
              path: /tmp/organization_ids.json
        container:
          image: <image>
          command:
            - bin/rails
            - runner
            - ImportFilesRunner.new.run  # 自分の担当 index の organization を取り出して取り込む処理
          env:
            # 担当する index を環境変数として渡す
            - name: ORGANIZATION_IDS_INDEX
              value: "{{inputs.parameters.organization-ids-index}}"

ポイントは、ステップ 1 が出力した artifact (/tmp/organization_ids.json) と件数 (count) を、ステップ 2 が withSequence + {{item}} で受け取るところです。withSequence が 0 から count - 1 までの整数を {{item}} に展開し、その数だけ Pod が並列に起動します。各 Pod は受け取った index を使って artifact から該当の organization を取り出し、取り込み処理を実行します。

💡 補足

Argo Workflows には、リストをそのまま展開する withItems / withParam というより簡潔な書き方もあります。ただし withParam で渡せるパラメータには上限があり、今回のユースケースでは organization 数が増えたときに上限を超える可能性があったため、index を withSequence で展開し、実データは artifact 経由で共有する方式を採用しました。

5.3 設計判断: parallelism: 20 の意味

parallelism: 20 は、この steps の中で 同時に走る Pod の数の上限を表します。HPA の maxReplicas に相当する概念ですが、決定的な違いがあります:

  • HPA: 「現在の Pod 数」をメトリクスで増減させる連続的な値。scale-in 時にどの Pod を kill するかは処理状況を考慮しない
  • Argo: 「同時に並列で起動できる Pod 数」の上限。各 Pod は自分の担当が終わったら自然終了し、空いた枠で次の index が起動する

値の決め方は 旧構成 (Resque worker + HPA) の maxReplicas に揃える という方針を取りました。

  • 移行時に並列度を変えてしまうと、性能特性や下流(DB / 外部リソース)への負荷も同時に変わってしまい、移行の効果を切り分けにくくなる
  • 過去にチューニング済みの値をそのまま踏襲できる(DB 接続数や上流負荷を踏まえて Resque 時代に決められた値)
  • 「まずは同じ並列度で動くこと」を確認してから、必要に応じて調整する方針

5.4 運用面の工夫

実装そのもの以外で、運用上効いた工夫を挙げます。

失敗時の Slack 通知

社内の Argo 基盤には、Workflow に専用の annotation を付与するだけで失敗時に Slack に通知が飛ぶ仕組みが整備されており、これに乗ることができました。既存の仕組みに乗れることの嬉しさの一例です。

ランナウェイ防止

日次で取り込みジョブが実行されているため、翌日のジョブ起動時刻まで動くと困ります。そのため 5.2 の manifest で示した通り activeDeadlineSeconds に 28800(8 時間)を設定し、最悪のケースでも翌日の起動までに workflow が打ち切られるようにしています。


6. まとめ

  • HPA は 長く動き続けるワーカー のスケールには向いているが、途中で安全に止められない処理を Pod に抱えたバッチには、Pod が自然終了するジョブ志向ツール(Argo Workflows 等)が合う
  • withSequence + parallelism + artifact による index 配布は、並列バッチの定番パターンとして再利用しやすい
  • 既存の Argo 基盤(Slack 通知 / UI / 監視)に乗ることで、移行コストを大きく下げられた