はじめに
こんにちは、@shase です。
スタディサプリでは、データパイプラインのツールとして、従来 AWS Kinesis Stream や、Embulk や、AWS Lambda などがよく使われてきました。
ただ、現在開発中のプロジェクトでは、システム間の連携の為、Cloud Pub/Sub が多用されているということもあり、データパイプライン Cloud Pub/Subとの親和性が高いCloud Dataflowを一部取り入れています。
本記事では Cloud Dataflow 自体は詳述しませんが、簡単に説明させていただくと、Cloud Dataflowとは、GCP が提供するマネージドな Apache Beam の実行環境になります。
Cloud Dataflow のメリット
Cloud Dataflow(Apache Beam)には、以下のようなメリットを感じています。
- ストリーミング処理が可能であること
- データの逐次連携が可能になり、ミニバッチ的な処理より準同期的にデータ連携ができるのはうれしいです。
- ストリーミング途中にロジックを挟めること
- データをストアするまでに、ロジックを挟めることがうれしいです。日付を加工したり、形式を加工したりなどデータパイプラインでできると有り難いロジックは多いです。
- マネージド、かつスケーラビリティがあること
この仕組みは、将来的に単にデータデータ連携というだけではなく、MLの為のデータパイプラインのベースになると考えています。
技術選定の詳細
どのSDKを使うか
Cloud Dataflow(Apache Beam) は、プログラマブルにデータパイプラインを記述することができるものです。
Dataflow の ドキュメントによると、現在は Java SDK と Python SDK が対応しています。
それぞれの SDK で対応している GCP のコンポーネントが異なり、使う前に確認が必要です。
データチームでは、 Python を主に使っているのですが、今回は Bigtable が Python SDK で Supported ではないので、Java SDK を使うことにしました。
などの選択肢があります。
scio は、 Spotify が開発している Apache Beam のパイプラインを Scala で記述するためのライブラリです。
今回、少し検証の時間をもらったので、Java でそのまま記述するパターンと、scio を使ってSala で記述するパターンの両方でプロトタイピングをしてみました。
Java or Scala
結論からいうと、シンプルなパイプラインを記述する分には、Java でも Scala でも遜色なく記述することができました。
しかし、Scala (scio) を使ったパターンは、あまりにも情報が少なく(ほぼ公式サイトぐらいの情報しかない)、(散々試行錯誤した)自分が実装したものを引き継ぐという観点で、問題があると感じました(Javaのほうは、公式サイト、Stackoverflow、GitHub などに参考にできる情報が数多くありました)。
というわけで、今回は保守性の観点から Java で記述することにしました。
サンプルコード
実際の実装はビジネスロジックが入っているので、あくまでもサンプルですが、Java で記述した場合、コードは以下のようになります。
処理としては
. ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── org │ │ │ └── example │ │ │ └── Sample.java
Sample.java
package org.example; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.cloud.bigtable.beam.CloudBigtableIO; import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PDone; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import java.util.LinkedHashMap; import java.util.Map; public class Sample { static class GeneralTransformFn extends DoFn<String, Mutation> { String bigtableCf = ""; GeneralTransformFn(String bigtableCf) { this.bigtableCf = bigtableCf; } @ProcessElement public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) { try { ObjectMapper mapper = new ObjectMapper(); Map<String, String> jsonValue = mapper.readValue(rowkey, LinkedHashMap.class); long timestamp = System.currentTimeMillis(); Put row = new Put(Bytes.toBytes(jsonValue.get("id"))); jsonValue.entrySet().stream().forEach( e -> row.addColumn( Bytes.toBytes(this.bigtableCf), Bytes.toBytes(e.getKey()), timestamp, Bytes.toBytes(e.getValue())) ); out.output(row); } catch (Exception e) { // 例外処理 } } } public static interface StreamOptions extends DataflowPipelineOptions { @Validation.Required ValueProvider<String> getProjectId(); void setProjectId(ValueProvider<String> value); @Validation.Required ValueProvider<String> getInputSubscription(); void setInputSubscription(ValueProvider<String> value); @Validation.Required ValueProvider<String> getBigtableInstance(); void setBigtableInstance(ValueProvider<String> value); @Validation.Required ValueProvider<String> getBigtableTableId(); void setBigtableTableId(ValueProvider<String> value); @Validation.Required ValueProvider<String> getDataflowStagingLocation(); void setDataflowStagingLocation(ValueProvider<String> value); @Validation.Required ValueProvider<String> getDataflowTempLocation(); void setDataflowTempLocation(ValueProvider<String> value); } public static void main(String[] args) { StreamOptions streamOptions = PipelineOptionsFactory .fromArgs(args) .withValidation() .as(StreamOptions.class); PipelineOptions options = PipelineOptionsFactory.create(); DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); dataflowOptions.setRunner(DataflowRunner.class); dataflowOptions.setProject(streamOptions.getProjectId().toString()); dataflowOptions.setNumWorkers(1); dataflowOptions.setRegion("asia-northeast1"); dataflowOptions.setStreaming(true); dataflowOptions.setJobName("sample-job"); dataflowOptions.setStagingLocation(streamOptions.getDataflowStagingLocation().toString()); dataflowOptions.setGcpTempLocation(streamOptions.getDataflowTempLocation().toString()); CloudBigtableTableConfiguration bigtableTableConfig = new CloudBigtableTableConfiguration.Builder() .withProjectId(streamOptions.getProjectId().toString()) .withInstanceId(streamOptions.getBigtableInstance().toString()) .withTableId(streamOptions.getBigtableTableId().toString()) .build(); Pipeline p = Pipeline.create(dataflowOptions); PDone pCol = p.apply("read", PubsubIO.readStrings().fromSubscription(streamOptions.getInputSubscription().toString())) .apply("transform", ParDo.of(new GeneralTransformFn("sample"))) .apply("write", CloudBigtableIO.writeToTable(bigtableTableConfig)); p.run(); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>sample-app</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.31.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>2.31.0</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>2.31.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-core-java</artifactId> <version>2.31.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.31.0</version> </dependency> <dependency> <groupId>com.google.cloud.bigtable</groupId> <artifactId>bigtable-hbase-beam</artifactId> <version>1.23.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.12.4</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.8.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.8.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </build> </project>
ビルドとデプロイ
Dataflow の実行方式はいくつかあります。
- 従来のテンプレートをデプロイして、Jobを作成する方法
- Flex テンプレートをデプロイして、Job を作成する方法
- Direct 実行
それぞれメリット・デメリットがあります。テンプレートを用いた方式は、同じようで少し違うJobを多く作成する場合に便利だと思います。今回はさほど多くの Job を作成したいというわけでもないので、Direct 実行しています。
以下の環境が必要です。
今回作成したコードのビルドとデプロイは以下のようになります。
動作につかう、サービスアカウントには、Dataflow の実行に必要な権限を付与して、GOOGLE_APPLICATION_CREDENTIALS
環境変数にセットしてください。
$ export GOOGLE_APPLICATION_CREDENTIALS=./credential.json $ mvn compile exec:java \ -Dexec.cleanupDaemonThreads=false \ -Dexec.mainClass="org.example.Sample" \ -Dexec.args=" \ --projectId=foobar-dev \ --inputSubscription=projects/foobar-dev/subscriptions/shase_hello-sub \ --bigtableInstance=foobar-product \ --bigtableTableId=shase_hello \ --dataflowStagingLocation=gs://dataflow-app-foobar-dev/stg/ \ --dataflowTempLocation=gs://dataflow-app-foobar-dev/temp/ \ "
動作テスト
mvn でデプロイ後、Job がウォームアップするまでに 5分ほど必要です。
gcloud コマンドで、Pub/Sub と Bigtable にアクセスできるように iam の設定等をしておいてください。
Pub/Sub に サンプルデータの投入
$ gcloud pubsub topics publish shase_hello --message '{"id":"abc01","value1":"xyz"}'
Bigtable でデータの参照
$ cbt -project foobar-dev -instance foobar-product read shase_hello ---------------------------------------- abc01 sample:id @ 2022/02/08-12:55:17.896000 "abc01" sample:value1 @ 2022/02/08-12:55:17.896000 "xyz"
CI/CD
テストについては、ビジネスロジック部分に、普通に Junit でテストコードを書いています。それを $ mvn test
で実行できるようにしておき、Cloud Build に設定することで、CIを実現しています。
また、CD についても、前述の通り mvn コマンドでビルドとデプロイを実行できるので、それを Cloud Build の step として組み込んでいます。
スケールアウトの考え方
別途、EnableStreamingEngine Optionを有効にすることで、Streaming Engine 機能が有効になります。
Streaming Engine 機能を有効にすることによって、「水平自動スケーリング 」が有効になり、Worker 数の手動調整が不要となります。
今回のシステムでは、スケールアウトが必要な job については、Streaming Engine 機能を有効にして、スケールアウトさせることを想定しています。
おわりに
今回は、Dataflow でのデータパイプライン構築について、紹介させて頂きました。
先日、新しい Dataflow の仕組みである、Dataflow Prime が発表されました。より性能がよくなっているようです。
まだ、Preview のため本番では使っていませんが、そのうち GA になるので、今後の検証ネタとしたいです。