はじめに
こんにちは。Data Engineer の @shase です。
弊社ではいくつかのユースケースでCloud Composer(Airflow)を使っているのですが、今回はデータチームで開発している、分析者向けBigQuery SQL実行基盤(社内の通称はSaved Query Workflow)について紹介します。
このシステムは今年の春から動いているものです。
システム概要
今回紹介するシステムの概要です。
- 分析者はSQLとYAMLをGitHubにコミットしてPRを作成します。
- エンジニアがレビューをします。
- Cloud ComposerでSQLがスケジュール実行され、結果がGoogle Sheets などに出力されます。
背景
組織全体のKPI集計やレポーティングとは別に、分析者個人や特定のチームが使うテーブルやレポートを定期的に作成する場合を想定したユースケースとして、分析者向けBigQuery SQL実行基盤をつくりました。
BigQueryを普段お使いの方ですと、Scheduled query で問題ないのでは?と思われる方も多いかもしれません。しかし、Scheduled queryでは実現できない以下のようなことのために、今回はCloud Composerを使ったSQL実行基盤を用意することにしました。
(Cloud Composerの採用自体は、弊社のDWHをBigQueryに移管するProjectで採用されていたから、という理由が大きいです。)
狙い
クエリ結果の任意のGoogle Sheetsへの自動保存
- これはSchedule queryだけでは実現できない。
利用イメージ
通常、Airflowでは、PythonファイルでDAGを構成します。
今回の基盤では分析者が必ずしもPythonに習熟していないことを想定し、YAMLファイルとSQLファイルだけを書いてコミットすれば、BigQueryに対してSQLがスケジュール実行される、という状態を目指しました。
job: schedule: "0 1 * * *" tasks: - name: shase_daily_foo1_bq_check bq_sql: shase_daily_foo1_bq_check.sql bq_destination_table: "foo-sandbox.shase.foo1" bq_mode: replace spreadsheet_id: "xxx" spreadsheet_range: "Sheet1!A1" - name: shase_daily_foo2_bq_check bq_sql: shase_daily_foo2_bq_check.sql bq_destination_table: "foo-sandbox.shase.foo2" bq_mode: replace - name: shase_daily_bar_bq_check bq_sql: shase_daily_bar_bq_check.sql bq_destination_table: "foo-sandbox.shase.bar" bq_mode: append spreadsheet_id: "xxx" spreadsheet_mode: append
YAMLファイルでは実行スケジュール、実行するSQLファイル、BigQueryのoutput先テーブル、output先のGoogle SheetのIDなどを指定しています。
YAMLからAirflowのDAGの生成
Airflowでは動的にDAGを生成することができます。先程コミットした、YAMLをparseして、Airflow の SubDAGとしています。
UIからの見え方は以下のようになります。
Google Sheets連携
Google Sheets 連携には、Embulkのoutput pluginを自作して利用しています。 (少し検索したのですが、良い既存のpluginがなかったというのもあります)
ユースケースが社内利用に少し向きすぎていると感じる部分もありますが、もしよかったら使っていただけるとうれしいです ^^
一応 ^ こちらで公開しています。
今後の課題
現在は徐々に社内の利用者が増えているフェーズです。
CIやCDにはまだまだ改善の余地があり、このあたりは充実させていきたいと考えています。
データチームでは、分析環境の継続的な改善に取り組んでいます 💪