こんにちは、データプラットフォームチームの鳥山(id: to_lz1)とマイン(id: manhnguyen1998)です。
Classiでは、AWS上にあるサービスが出力したログをBigQueryに連携するプロダクト「Seneka」を開発し*1、社内の開発者・分析者に役立ててもらっています。
ログの連携はこれまでバッチ処理で行っていたのですが、この夏に技術的なチャレンジも兼ねてニアリアルタイムでの連携が出来るように移行しました。そこで、この記事で移行前後の構成と、移行時に気をつけたことについて、紹介できればと思います。
Senekaのそれまでの構成
改善方法を議論する前に、まずは従来の構成について簡単に紹介します。
- アプリケーションまたはロードバランサーからS3にログファイルがアップロードされる
- Cloud Composerで毎時実行されるDAGにより、以下の処理を実行
- Storage Transfer ServiceでS3からGoogle Cloud Storage(以下、GCS)へデータを転送
- GCSからStorage Write APIを使ってBigQuery上の一時テーブルにデータをInsert
- 一時テーブルのデータとデータレイクのテーブルとをマージ
従来の構成が完全に悪いわけではありませんが、毎時定期的に実行されるため、開発者はログの反映まで最大60分程度待つ必要がありました。
この構成のままでも、単純にバッチの実行頻度を上げればリアルタイム性を上げられます。しかし、それまでの作りの関係上、ファイルによっては何度も無駄に取り込みが行われてしまうといった問題がありました。このように、バッチ処理のアプローチのままではコスト面でも実行時間面でも限界があるため、イベントドリブンの構成にチャレンジしてみる価値は十分にあると判断しました。
Senekaの新しい構成
新しい構成は以下のようなものです。
- アプリケーションまたはロードバランサーからS3にログファイルがアップロードされる
- S3にファイルが置かれた時点でPutObjectイベントが発生し、SQSに通知が送られる
- GCP側でStorage Transfer Serviceのタスクが作成され、SQSメッセージをトリガーとして実行される
- GCSにファイルが置かれた時点でFinalizedイベントが発行され、それがPub/SubにPublishされる
- Pub/SubトリガーによってCloud Functions*2が起動される
- BigQueryへのInsertの方式は従来と同様
結果
ログがBigQueryでクエリできるようになるまで、従来は最大60分程度かかっていましたが、新しい構成では数分でクエリ可能になりました。
今回、効果を正確に把握するために、あるELBの1日分のログを対象に「BigQueryに連携されるまでどれくらいの時間がかかったのか」を集計してみました。
上記から、最大6分未満でログが連携される世界になった、と言えそうです。すべてのログを調べたわけではありませんが、ECSサービスを含めた他のいくつかのログに関しても概ね同様の結果となっていました。
また、イベントドリブン化によってコストが増えることもなく、逆に約10%削減することができました。
21日はリリース前、22日-25日の間は検証期間で一時的にコストが増加しました(後述します)が、チューニング後の26日に本番リリースして以降はコストが削減出来ています。
工夫した点
移行においては以下のようなことを工夫しました。
Design Docによる認識共有
Classiには、Design Docを書く文化があります。
実装に取りかかる前にしっかりと調査を行い、その調査結果をもとにシステムを設計しました。提案のメリット・デメリットを明確にしたうえで、チーム内で意見を共有し、方針を確定してから実装に移るという流れです。
また、選定する技術については、以下の観点で調査・検討をしました。
サービスごとのクォータ・上限
クラウドサービスにはしばしば上限があります。システムの要求に対して、この上限を超えた場合に問題が生じる可能性があるため、将来的にこの上限を超える可能性があるかどうかを予測してサービスを選びます。例えば、Pub/SubトリガーのCloud Functionsの最大実行時間は9分です。もしこの9分を超える場合にどう対応するかなど、こうした上限を常に考慮しています。
サービスごとのコスト体系や無料枠
これは非常に重要なポイントです。選んだサービスのコストプランがどのようになっているか、また、どのようにコストを削減できるかを検討する必要があります。例えば、Cloud Functionsは月に200万リクエストまでは無料なので、Classiの規模であればイベントドリブンで実装してもコストを抑えることができると判断しました。
段階的な移行
新しい構成へ一度に移行するのはリスクが高いため、段階的に移行を進めました。
まず、検証のためにStaging環境で一部のバケットに対して新しい構成を適用し、影響を確認しました。検証中に、BigQueryのスキャンコストが高くなる現象が起きました。Staging環境でもそれまでと比べて数倍になっており、このまま本番環境に移行すると、1日数TB〜数十TBものスキャンが発生する可能性があると予想しました。
調査の結果、1日あたり24回のバッチからイベントドリブンの処理に移行したことで、テーブルへの書き込み頻度が大幅に増加しており、クエリの数が1日あたり数万件に達することがわかりました。
従来のInsert処理は、Delete -> Insert の流れで行われており、この構成には以下のメリットがありました。
- データ取り込みの冪等性(Idempotency)を維持できる
- 既存のファイルが編集された場合でも再取り込みが可能
しかし、レコードをDeleteするときにどうしても走査が発生してしまい、それがスキャン量の高騰の主要因になっていました。今回の要件ではアプリケーションログが取り込み対象であるため、既存のファイルが編集される可能性は極めて低いといえます。レコードが所属するファイルの存在を確認するだけで十分であるため、より単純なInsert処理に移行しても問題ないと結論付けました。
本番環境に適用する際は、データの挿入先をテスト用のデータセットに変更し、旧構成と新構成でデータ内容に差異がないかを確認しました。差分がないことが確認できた後、安全に切り替えを行い、本番環境に移行しました。
段階的に移行を進めた結果、コストの問題や本番環境でしかわからないキャパシティの問題(後述します)にも、焦らず対処できるようになりました。
リリース前後に見つかった課題への対応
段階的に対応を進める中で、スキャン量以外にもいくつか問題が見つかり、さらなる改善が必要でした。
コストチューニング
本番環境でテスト用データセットへの挿入を稼働させた直後、ログの生成頻度が予想以上に高く、コストも非常に高くなってしまいました。先ほどのグラフで発生していたコストの一時的な山は、このときに発生したものです。
分析の結果、以下の3つの問題が見つかりました。
(1) スケーリング不足でリトライが頻発していた問題
最初に設定していたCloud Functionsのスケール上限を超えるリクエストが発生してしまい、リトライが大量発生したためにコストが増大していました。このため、Cloud Functionsのスケール上限を以下の通り増やしました。
- インスタンスのメモリの使用率はまだ25%程度だったため、同時実行数を2に増やし、1つのインスタンスで2つのリクエストを処理できるようにした
- 最大インスタンス数を300に拡大した
この対応により、スケールの問題は解決できました。
(2) GCSのClass A操作数が多く課金が増加した問題
Cloud Billingのレポートを見ると、Cloud Storage ClassA OperationというSKUがコストの一因になっていました。
ドキュメントを確認すると、storage.objects.list
というAPIの呼び出しがClass A操作に含まれており、コード内で listObjects メソッドを使用していることと関連していそうでした。
そもそも listObjects を使っていたのは、毎時のバッチで処理していた頃には1回の実行で複数のログファイルを処理する可能性があったためです。
今回は既存のコードを Functions Framework でラップすることで開発を省力化していました。そのため、かつて必要だった listObjects を呼び出す処理が残ったままとなり、これが無駄に動き続けていたのです。最終的に、該当の処理を getObject に変更し、無事にコストを削減できました。
(3) Data Lineage APIの課金が増加した問題
Data Lineage は Dataplex の一機能で便利なサービスですが、ログ連携用のプロジェクトにおいてはほとんど使用しておらず、また一時テーブルを大量に作る関係でコストが無視できない水準まで増加していました。利用を止めることで発生するデメリットもなかったため、APIを無効化してコストを削減しました。
上記の対応により、最終的に一日あたりのコストを移行前よりも低く抑えることができました。
メモリリーク解消
リリースしてから4日後、突然「Out Of Memory」というエラーが発生しました。メモリの使用状況を確認したところ、メモリが段階的に増加していることがわかり、メモリリークの問題が考えられました。
Goのpprofを使用してメモリ利用を分析した結果、解放されていないBigQueryのgRPCクライアントが見つかりました。そのクライアントを解放することで問題を解決しました。
リーク箇所の修正リリース以後は、メモリの使用状況やインスタンス台数が安定することを確認できました。
まとめ
以上、アプリケーションログの連携基盤にイベントドリブンの転送を用い、連携速度を大幅に向上させた話をお届けしました。
リアルタイムでログを分析したい、という要望は一般的なもので、これを可能にするソリューションは数多くあります。近い将来では、例えば BigQuery Omni のようなサービスやプロダクトも試す余地があると考えています*3。
BigQuery Omni を使用すると、BigLake テーブルを使用して、Amazon Simple Storage Service(Amazon S3)または Azure Blob Storage に保存されたデータに対して BigQuery 分析を実行できます。
BigQuery Omniに限らず、昨今登場してきているサービスは 「そもそもデータ転送をしない」 や、 「自前で仕組みを作る手間が要らない」 という方向性を指向しているように思います。
これは非常に大事な考え方です。データ基盤の開発と運用そのものに工数を使うよりも、貯めたデータを使ってどうやって価値を生み出すかという部分にフォーカスできた方がエンジニア組織としてのインパクトは大きくなるからです。
しかしながら、こうして自前で仕組みを設計・実装・運用することから得られる学びは大きく、そのことがまたインパクトを生み出すための力になるという側面もあるでしょう。
旧来のログ連携基盤による毎時の連携でも、「ものすごく困っている!絶対に数分で連携してほしい!」という声はなかったわけですが、この伸びしろにあえて挑んでみて得られた学びは大きく、また最終的にも成功を収める事ができました。事業インパクトを直接生み出す「攻め」でもなく、データ基盤の安定を図る「守り」でもなく、技術的な新しさを取り入れるチャレンジをする「遊び」として、個人的には非常に良い取り組みだったと思っています。
We are Hiring!
最後に取り上げた「攻めと守り、そして遊び」は、私たちデータプラットフォームチームが属するプラットフォーム部の今期テーマでもあります。
部長であるlacolacoと1on1をしていて出てきたフレーズなのですが、自分もとても気に入っており今季だけと言わずエンジニアリングをするに当たってずっと大切にしたい三要素です。
末筆となりますが、そんな「攻めと守り、そして遊び」の鼎立に挑むエンジニア組織で一緒に働きたい仲間を募集中です!少しでも気になった方は、以下求人からご応募・カジュアル面談の申し込みをお待ちしております。
*1: Classiのデータ関連システムには、哲学者の名前をつける慣習があります。 ref: https://tech.classi.jp/entry/2021/05/31/120000
*2: Google Cloudから2024-08-22にCloud Run Functionsへのリブランディングが発表されましたが、本記事中では Cloud Functions と呼称します
*3:クエリパフォーマンスの懸念や、Order By句の使用等に関する様々な制限、またそもそもまだ東京リージョンに来ていないといったいろいろな制約から、今回はまだ時期尚早であろうと判断しました