こんにちは、データプラットフォームチームでデータエンジニアをやっている滑川(@tomoyanamekawa)です。
データプラットフォームチームはデータAI部のメンバーで構成されていて、データ分析基盤を中心としたデータ活用に関するシステムに責務を持つチームです。
データAI部が出来てから3年が経ち、データ分析基盤を今の形で運用をして1年半が経過しました。
データエンジニアの採用活動の中でデータ分析基盤を紹介する必要がある一方、説明コストが高く困っていました。
そこで今回は「ソクラテス」と呼んでいる社内のデータ分析基盤について紹介します。
(データAI部ではシステム基盤に哲学者の名前をつける慣習があります。)
ソクラテスの責務
Classiのデータ分析基盤ではData Lake(DL), Data Warehouse(DWH), Data Mart(DM)の3層構造を採用しており、そのETL(Extract / Transform / Load)処理全般がソクラテスの責務です。
各データをDL用のGCPプロジェクトにあるBigQueryに集約し、DWH, DMを作成するSQLの実行、レポーティングまでを行っています。
現状ではdailyでのバッチ処理を採用しています。
このソクラテスで扱っているデータは下記のものです。
- Amazon RDSに貯まっているデータ
- Webサービスのアクセスログ(Google Analytics 360)
- スマートフォンアプリの行動ログ(Firebase Analytics)
- アプリケーションのログ
全体構成
社内ユーザーが直接アクセスするのはDM/DWH用のBigQueryと各種BIツールです。
Classiの提供するアプリケーションがAWS上に構築されているため、基盤全体としてはAWSとGCPのマルチクラウドの構成になっています。
ワークフローはGlueのみ単体で定期実行していますが、Amazon S3以降の処理はCloud Composerによって実行を管理しています。
主なデータの流れ
データの種類によって経路が違いますが、ここでは一番メインになるRDSのデータをETLする流れを紹介します。
(番号は構成図に振ってある番号に対応しています。)
①RDS→S3
Glueを使い、RDSにあるデータを必要なマスキング処理を行った上でJSON形式でS3に出力しています。
学校ごとにDBが分かれているため、その集約をここでしています。
これらのことを実現するためにGlueのカタログ機能では難しい部分があり、SparkとPython Shellを混ぜつつ自分たちでjob部分をカスタマイズしています。
データのフォーマットはJSON以外にAVROやParquetも検討をしましたが、データ量的にJSONでも問題がなかったため扱いやすいJSONにしました。②S3→GCS
Cloud ComposerからGCPのCloud Storage Transfer Serviceを実行して、GCSに取り込んでいます。
実装当時には対応するairflowのoperatorが実装されていなかったため、クライアントライブラリを使ってoperatorの代わりを実装してPythonOpetator経由で実行しています。③GCS→BigQuery
Cloud ComposerのGoogleCloudStorageToBigQueryOperatorを使ってBigQueryにJSONをloadしています。
loadしたいテーブルが数百あり、schemaも扱いやすいようにコントロールしたいため、YAMLとschemaファイルを別途用意し、load対象を管理しています。④BigQuery内でのquery実行
RDSのデータ以外も含めた全データが揃った段階で実行します。
Cloud ComposerのBigQueryOperatorを使ってqueryを実行し、DWH、DMを作成しています。
このDWH、DM作成部分では工夫している点が多いため詳細は後述します。その他
各処理の間や最後にデータの整合性チェックやSlackへのレポーティング処理も行っています。
運用のポイント
DWH , DMの作成・更新方法
データプラットフォームチームで気にしていることは、簡単にテーブルを追加でき、処理が再実行可能であるようにすることです。
アナリストやディレクターなど色々な職種のデータ利用者にDWH , DMを拡張してもらうためにシンプルであることを意識しています。
Cloud Composer内では具体的に下記のような工夫をしています。
- 定期実行するqueryの追加方法をシンプルにする
Cloud ComposerのDAGを非エンジニア含め、みんなに書いてもらうのは負荷が高いため、実行したいqueryと下記のようなYAMLで宛先のテーブル名、先に実行してほしいtaskなどを書くだけで定期実行するqueryを追加できるようにしています。
tasks: - task_id: task1 file_path: queries/GCPプロジェクト/データセット/task1.sql destination_table: データセット.task1 table_type: table write_disposition: WRITE_TRUNCATE dependent_tasks: - task_id: task2 file_path: queries/GCPプロジェクト/データセット/task2.sql destination_table: データセット.task2 table_type: table write_disposition: WRITE_TRUNCATE dependent_tasks: - task1
- SQL、YAML内で使える日付パラメータの充実
先週分を集計したいなど特定のパターンで集計したい要望があるため、日付パラメータを充実させています。
デフォルトでの日付パラメーターではJSTに対応しておらず、バリエーションも足りていないため下記のように別途作成した関数をuser_defined_macrosでDAGに読み込ませています。
with models.DAG( dag_id=DAG_ID, schedule_interval="00 * * * *", catchup=False, template_searchpath="/home/airflow/gcs/dags/", user_defined_macros={ "jst_ts": convert_timezone.jst_ts, "jst_ds": convert_timezone.jst_ds, }, default_args=default_args, ) as dag:
- 冪等性の担保
エラー時に気兼ねなく再実行したい、queryの変更時に過去分も再実行しデータを洗い替えたいというケースが多々あります。
BigQueryのsharding tableとpartitioned table(keyに日付を利用している場合)では特定の日付分のみを上書きすることが可能なのでその機能と日付パラメータを組み合わせることで冪等になるようにしています。
利用状況モニタリング
BigQueryの利用状況をTableauで可視化しています。
ユーザーごとの利用数だけでなく、テーブルごとの利用数やscan量もみています。
社内でのデータ活用を促進するために現状把握するのが主な使い道ですが、その他にこのような使い方をしています。
- テーブルを整理したいときやschemaを変更したいときに影響を受ける人が特定する
- 効率の悪いqueryを書いている人がいた場合にサポートする
- よく使われるテーブルや処理をDWHとして共通化する
今後の拡張
現場の課題では下記のようなものがあり、これらを順次対応しています。
- DAG(処理の依存関係)が適切に狭められておらず、直接関係がない処理のエラーに引きずられて止まってしまう部分がある
- DWH , DMの依存関係定義が人力になっていて、数が増えるにつれて複雑になりスケールできない
- DWH , DMのメタデータ管理が徹底できておらず、テーブルの仕様が誰でも理解できる状態になっていない
- etc...
そのほかではログのリアルタイム連携や営業データとの連携など、データの種類や鮮度の向上に取り組んでいます。
まとめ
Classiのデータ分析基盤の概要と運用で気にしていることについて紹介しました。
まだ活用できる状態にないデータもあり、今回紹介したソクラテス以外でもデータからユーザーに価値を還元するための取り組みをしています。
やりたいことに対してまだまだデータエンジニアが足りていないため、興味がある方の応募をお待ちしております!