スタディサプリ Product Team Blog

株式会社リクルートが開発するスタディサプリのプロダクトチームのブログです

Cloud Dataflow で実現する柔軟なデータパイプライン

はじめに

こんにちは、@shase です。

スタディサプリでは、データパイプラインのツールとして、従来 AWS Kinesis Stream や、Embulk や、AWS Lambda などがよく使われてきました。

ただ、現在開発中のプロジェクトでは、システム間の連携の為、Cloud Pub/Sub が多用されているということもあり、データパイプライン Cloud Pub/Subとの親和性が高いCloud Dataflowを一部取り入れています。

本記事では Cloud Dataflow 自体は詳述しませんが、簡単に説明させていただくと、Cloud Dataflowとは、GCP が提供するマネージドな Apache Beam の実行環境になります。

Cloud Dataflow のメリット

Cloud Dataflow(Apache Beam)には、以下のようなメリットを感じています。

  • ストリーミング処理が可能であること
    • データの逐次連携が可能になり、ミニバッチ的な処理より準同期的にデータ連携ができるのはうれしいです。
  • ストリーミング途中にロジックを挟めること
    • データをストアするまでに、ロジックを挟めることがうれしいです。日付を加工したり、形式を加工したりなどデータパイプラインでできると有り難いロジックは多いです。
  • マネージド、かつスケーラビリティがあること
    • Apache Beam の実行基盤を自前で維持するのはそれなりに手間なので、スケーラビリティのあるマネージドサービスが使えるのは大変うれしいです。

この仕組みは、将来的に単にデータデータ連携というだけではなく、MLの為のデータパイプラインのベースになると考えています。

技術選定の詳細

どのSDKを使うか

Cloud Dataflow(Apache Beam) は、プログラマブルにデータパイプラインを記述することができるものです。

Dataflow の ドキュメントによると、現在は Java SDKPython SDK が対応しています。

それぞれの SDK で対応している GCPコンポーネントが異なり、使う前に確認が必要です。

データチームでは、 Python を主に使っているのですが、今回は BigtablePython SDK で Supported ではないので、Java SDK を使うことにしました。

ただ、Java SDK でもいくつか選択肢があり、

  • Java SDK を使って Java で記述する
  • Java SDK を使って、scio(後述)を使って、Scala で記述する

などの選択肢があります。

scio は、 Spotify が開発している Apache Beam のパイプラインを Scala で記述するためのライブラリです。

github.com

今回、少し検証の時間をもらったので、Java でそのまま記述するパターンと、scio を使ってSala で記述するパターンの両方でプロトタイピングをしてみました。

Java or Scala

結論からいうと、シンプルなパイプラインを記述する分には、Java でも Scala でも遜色なく記述することができました。

しかし、Scala (scio) を使ったパターンは、あまりにも情報が少なく(ほぼ公式サイトぐらいの情報しかない)、(散々試行錯誤した)自分が実装したものを引き継ぐという観点で、問題があると感じました(Javaのほうは、公式サイト、Stackoverflow、GitHub などに参考にできる情報が数多くありました)。

というわけで、今回は保守性の観点から Java で記述することにしました。

サンプルコード

実際の実装はビジネスロジックが入っているので、あくまでもサンプルですが、Java で記述した場合、コードは以下のようになります。

処理としては

  • Pub/Sub からデータを取り出す
  • Bigtable の形式にTransformする
  • Bigtable に格納する
.

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── org
│   │   │       └── example
│   │   │           └── Sample.java

Sample.java

package org.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PDone;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.LinkedHashMap;
import java.util.Map;

public class Sample {

    static class GeneralTransformFn extends DoFn<String, Mutation> {
        String bigtableCf = "";

        GeneralTransformFn(String bigtableCf) {
            this.bigtableCf = bigtableCf;
        }

        @ProcessElement
        public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
            try {
                ObjectMapper mapper = new ObjectMapper();
                Map<String, String> jsonValue = mapper.readValue(rowkey, LinkedHashMap.class);

                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(jsonValue.get("id")));

                jsonValue.entrySet().stream().forEach(
                        e ->
                                row.addColumn(
                                        Bytes.toBytes(this.bigtableCf),
                                        Bytes.toBytes(e.getKey()),
                                        timestamp,
                                        Bytes.toBytes(e.getValue()))
                );
                out.output(row);
            } catch (Exception e) {
                // 例外処理
            }
        }
    }

    public static interface StreamOptions extends DataflowPipelineOptions {
        @Validation.Required
        ValueProvider<String> getProjectId();

        void setProjectId(ValueProvider<String> value);

        @Validation.Required
        ValueProvider<String> getInputSubscription();

        void setInputSubscription(ValueProvider<String> value);

        @Validation.Required
        ValueProvider<String> getBigtableInstance();

        void setBigtableInstance(ValueProvider<String> value);

        @Validation.Required
        ValueProvider<String> getBigtableTableId();

        void setBigtableTableId(ValueProvider<String> value);

        @Validation.Required
        ValueProvider<String> getDataflowStagingLocation();

        void setDataflowStagingLocation(ValueProvider<String> value);

        @Validation.Required
        ValueProvider<String> getDataflowTempLocation();

        void setDataflowTempLocation(ValueProvider<String> value);
    }

    public static void main(String[] args) {

        StreamOptions streamOptions = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(StreamOptions.class);

        PipelineOptions options = PipelineOptionsFactory.create();

        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject(streamOptions.getProjectId().toString());
        dataflowOptions.setNumWorkers(1);
        dataflowOptions.setRegion("asia-northeast1");
        dataflowOptions.setStreaming(true);
        dataflowOptions.setJobName("sample-job");
        dataflowOptions.setStagingLocation(streamOptions.getDataflowStagingLocation().toString());
        dataflowOptions.setGcpTempLocation(streamOptions.getDataflowTempLocation().toString());

        CloudBigtableTableConfiguration bigtableTableConfig =
                new CloudBigtableTableConfiguration.Builder()
                        .withProjectId(streamOptions.getProjectId().toString())
                        .withInstanceId(streamOptions.getBigtableInstance().toString())
                        .withTableId(streamOptions.getBigtableTableId().toString())
                        .build();

        Pipeline p = Pipeline.create(dataflowOptions);
        PDone pCol = p.apply("read", PubsubIO.readStrings().fromSubscription(streamOptions.getInputSubscription().toString()))
                .apply("transform", ParDo.of(new GeneralTransformFn("sample")))
                .apply("write", CloudBigtableIO.writeToTable(bigtableTableConfig));
        p.run();
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>sample-app</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>2.31.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>2.31.0</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
            <version>2.31.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-core-java</artifactId>
            <version>2.31.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>2.31.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.bigtable</groupId>
            <artifactId>bigtable-hbase-beam</artifactId>
            <version>1.23.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.4</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.8.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.2</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.0.0</version>
            </plugin>
        </plugins>
    </build>

</project>

ビルドとデプロイ

Dataflow の実行方式はいくつかあります。

  • 従来のテンプレートをデプロイして、Jobを作成する方法
  • Flex テンプレートをデプロイして、Job を作成する方法
  • Direct 実行

それぞれメリット・デメリットがあります。テンプレートを用いた方式は、同じようで少し違うJobを多く作成する場合に便利だと思います。今回はさほど多くの Job を作成したいというわけでもないので、Direct 実行しています。

以下の環境が必要です。

今回作成したコードのビルドとデプロイは以下のようになります。

動作につかう、サービスアカウントには、Dataflow の実行に必要な権限を付与して、GOOGLE_APPLICATION_CREDENTIALS 環境変数にセットしてください。

$ export GOOGLE_APPLICATION_CREDENTIALS=./credential.json
$ mvn compile exec:java \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.mainClass="org.example.Sample" \
  -Dexec.args=" \
  --projectId=foobar-dev \
  --inputSubscription=projects/foobar-dev/subscriptions/shase_hello-sub \
  --bigtableInstance=foobar-product \
  --bigtableTableId=shase_hello \
  --dataflowStagingLocation=gs://dataflow-app-foobar-dev/stg/ \
  --dataflowTempLocation=gs://dataflow-app-foobar-dev/temp/ \
"

動作テスト

mvn でデプロイ後、Job がウォームアップするまでに 5分ほど必要です。

gcloud コマンドで、Pub/Sub と Bigtable にアクセスできるように iam の設定等をしておいてください。

Pub/Sub に サンプルデータの投入

$ gcloud pubsub topics publish shase_hello --message '{"id":"abc01","value1":"xyz"}'

Bigtable でデータの参照

$ cbt -project foobar-dev -instance foobar-product read shase_hello
----------------------------------------
abc01
  sample:id                                @ 2022/02/08-12:55:17.896000
    "abc01"
  sample:value1                            @ 2022/02/08-12:55:17.896000
    "xyz"

CI/CD

テストについては、ビジネスロジック部分に、普通に Junit でテストコードを書いています。それを $ mvn test で実行できるようにしておき、Cloud Build に設定することで、CIを実現しています。

また、CD についても、前述の通り mvn コマンドでビルドとデプロイを実行できるので、それを Cloud Build の step として組み込んでいます。

スケールアウトの考え方

別途、EnableStreamingEngine Optionを有効にすることで、Streaming Engine 機能が有効になります。

Streaming Engine 機能を有効にすることによって、「水平自動スケーリング 」が有効になり、Worker 数の手動調整が不要となります。

今回のシステムでは、スケールアウトが必要な job については、Streaming Engine 機能を有効にして、スケールアウトさせることを想定しています。

おわりに

今回は、Dataflow でのデータパイプライン構築について、紹介させて頂きました。

先日、新しい Dataflow の仕組みである、Dataflow Prime が発表されました。より性能がよくなっているようです。

まだ、Preview のため本番では使っていませんが、そのうち GA になるので、今後の検証ネタとしたいです。